Oduig opened a new issue, #29461:
URL: https://github.com/apache/airflow/issues/29461
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
The `on_failure_callback` is invoked twice when a DAG fails due to a
timeout. This leads to duplicate failure alerts in our Slack channels. It
happens reliably and the invocations are roughly 5 seconds apart. Context and
full stack trace in both invocations are identical.
### What you think should happen instead
The `on_failure_callback` should only be invoked once
### How to reproduce
Here is the DAG which triggers the effect
```
import pendulum
from datetime import timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import
DatabricksSubmitRunOperator
import logging
import requests
dag_settings = {
"dag_id": f"INT_tableau_others_recommendation_classifications",
"max_active_runs": 1,
"dagrun_timeout": timedelta(minutes=1),
"start_date": pendulum.today("UTC"),
"default_args": {
"owner": "airflow",
"catchup": False
},
"tags": ["env:testing"],
"on_failure_callback": (
lambda context: [
logging.info(f"Minimal example - failure callback with context:
{context}"),
# Since logging.info does not work for us, we add a request to
RequestBin
# See https://github.com/apache/airflow/issues/29442
requests.post(
"https://<removed-for-publication>.m.pipedream.net",
json={"payload": f"Minimal example - failure callback with
context: {context}"}
)
]
)
}
dag = DAG(**dag_settings)
logging.info(f"Minimal example - Created DAG {dag.dag_id}.")
params = [
"--class", "com.example.Launcher",
f"dbfs:/libraries/scala/example-fat-jar.jar"
]
DatabricksSubmitRunOperator(
task_id="update_dataset",
dag=dag,
databricks_conn_id="databricks_default",
spark_submit_task={
"parameters": params
},
new_cluster={
"spark_version":
"10.4.x-cpu-ml-scala2.12",
"spark_env_vars": {
"JNAME": "zulu11-ca-amd64" # Use JDK 11
},
"spark_conf": {
"spark.sql.session.timeZone": "UTC"
},
"aws_attributes": {
"instance_profile_arn":
"arn:aws:iam::765<removed-for-publication>:instance-profile/DatabricksExecution"
},
"instance_pool_id": "1011-<removed-for-publication>",
"driver_instance_pool_id": "1011-<removed-for-publication>",
"num_workers": 1
}
)
```
### Operating System
composer-2.0.32
### Versions of Apache Airflow Providers
Here is the full `requirements.txt`
```
apache-airflow-providers-databricks==4.0.0
databricks-sql-connector==2.1.0
apache-beam~=2.43.0
sqlalchemy-bigquery==1.5.0
requests~=2.28.1
apache-airflow-providers-tableau==4.0.0
apache-airflow-providers-sendgrid==3.1.0
python-dotenv==0.21.0
urllib3~=1.26.8
tableauserverclient==0.23
apache-airflow-providers-http==4.1.0
# time library in airflow
pendulum==2.1.2
```
### Deployment
Composer
### Deployment details
We are running a Cloud Compose environment with image
`composer-2.0.32-airflow-2.3.4`
### Anything else
Since logging inside the callback is not working for us (29442), here is a
screenshot from RequestBin. Both invocations share the same log line, the
context and stack trace is also the same.

Here is the complete stack trace for either invocation.
```
File "/opt/python3.8/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/__main__.py",
line 38, in main
args.func(args)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line
51, in command
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py",
line 101, in wrapper
return f(*args, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 76, in scheduler
_run_scheduler_job(args=args)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/jobs/base_job.py", line
244, in run
self._execute()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 748, in _execute
self.processor_agent.start()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py",
line 158, in start
process.start()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 121,
in start
self._popen = self._Popen(self)
File "/opt/python3.8/lib/python3.8/multiprocessing/context.py", line 277,
in _Popen
return Popen(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line
19, in __init__
self._launch(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line
75, in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315,
in _bootstrap
self.run()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108,
in run
self._target(*self._args, **self._kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py",
line 255, in _run_processor_manager
processor_manager.start()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py",
line 486, in start
return self._run_parsing_loop()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py",
line 600, in _run_parsing_loop
self.start_new_processes()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py",
line 1003, in start_new_processes
processor.start()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py",
line 194, in start
process.start()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 121,
in start
self._popen = self._Popen(self)
File "/opt/python3.8/lib/python3.8/multiprocessing/context.py", line 277,
in _Popen
return Popen(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line
19, in __init__
self._launch(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line
75, in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315,
in _bootstrap
self.run()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108,
in run
self._target(*self._args, **self._kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py",
line 155, in _run_file_processor
result: Tuple[int, int] = dag_file_processor.process_file(
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71,
in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py",
line 656, in process_file
self.execute_callbacks(dagbag, callback_requests)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71,
in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py",
line 581, in execute_callbacks
self._execute_dag_callbacks(dagbag, request, session)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68,
in wrapper
return func(*args, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py",
line 595, in _execute_dag_callbacks
dag.handle_callback(
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68,
in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/dag.py",
line 1178, in handle_callback
callback(context)
File "/home/airflow/gcs/dags/integration_test/example.py"
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]