saulbein opened a new issue, #27818:
URL: https://github.com/apache/airflow/issues/27818
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
A user with access to manually triggering DAGs can trigger a DAG. provide a
run_id that matches the pattern used when creating scheduled runs and cause the
scheduler to crash due to database unique key violation:
```
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/airflow/__main__.py", line
38, in main
args.func(args)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py",
line 51, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line
99, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 76, in scheduler
_run_scheduler_job(args=args)
File
"/usr/local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
File "/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py",
line 244, in run
self._execute()
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line
752, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line
840, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line
912, in _do_scheduling
self._create_dagruns_for_dags(guard, session)
File "/usr/local/lib/python3.8/site-packages/airflow/utils/retries.py",
line 76, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File "/usr/local/lib/python3.8/site-packages/tenacity/__init__.py", line
384, in __iter__
do = self.iter(retry_state=retry_state)
File "/usr/local/lib/python3.8/site-packages/tenacity/__init__.py", line
351, in iter
return fut.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in
result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in
__get_result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/airflow/utils/retries.py",
line 85, in wrapped_function
return func(*args, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line
980, in _create_dagruns_for_dags
self._create_dag_runs(query.all(), session)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",
line 2772, in all
return self._iter().all()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",
line 2915, in _iter
result = self.session.execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 1713, in execute
conn = self._connection_for_bind(bind)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 721, in _connection_for_bind
self._assert_active()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 601, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates
unique constraint "dag_run_dag_id_run_id_key"
DETAIL: Key (dag_id, run_id)=(example_branch_dop_operator_v3,
scheduled__2022-11-21T12:26:00+00:00) already exists.
[SQL: INSERT INTO dag_run (dag_id, queued_at, execution_date, start_date,
end_date, state, run_id, creating_job_id, external_trigger, run_type, conf,
data_interval_start, data_interval_end, last_scheduling_decision, dag_hash,
log_template_id) VALUES (%(dag_id)s, %(queued_at)s, %(execution_date)s,
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s,
%(external_trigger)s, %(run_type)s, %(conf)s, %(data_interval_start)s,
%(data_interval_end)s, %(last_scheduling_decision)s, %(dag_hash)s, (SELECT
max(log_template.id) AS max_1
FROM log_template)) RETURNING dag_run.id]
[parameters: {'dag_id': 'example_branch_dop_operator_v3', 'queued_at':
datetime.datetime(2022, 11, 21, 12, 30, 20, 244878, tzinfo=Timezone('UTC')),
'execution_date': DateTime(2022, 11, 21, 12, 26, 0, tzinfo=Timezone('UTC')),
'start_date': None, 'end_date': None, 'state': <DagRunState.QUEUED: 'queued'>,
'run_id': 'scheduled__2022-11-21T12:26:00+00:00', 'creating_job_id': 7,
'external_trigger': False, 'run_type': <DagRunType.SCHEDULED: 'scheduled'>,
'conf': <psycopg2.extensions.Binary object at 0x7fddcb174f90>,
'data_interval_start': DateTime(2022, 11, 21, 12, 26, 0,
tzinfo=Timezone('UTC')), 'data_interval_end': DateTime(2022, 11, 21, 12, 27, 0,
tzinfo=Timezone('UTC')), 'last_scheduling_decision': None, 'dag_hash':
'7aad83be6a1698b4ce6c850647434daf'}]
(Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on
this error at: https://sqlalche.me/e/14/7s2a)
```
Worse yet, the scheduler will keep crashing after a restart with the same
exception.
### What you think should happen instead
A user should not be able to crash the scheduler from the UI.
I see 2 alternatives for solving this:
1. Reject custom run_id that would (or could) collide with a scheduled one,
preventing this situation from happening.
2. Handle the database error and assign a different run_id to the scheduled
run.
### How to reproduce
1. Find an unpaused DAG.
2. Trigger DAG w/ config, set the run id to something like
scheduled__2022-11-21T12:00:00+00:00 (adjust the time to be in the future where
there is no run yet).
3. Let the manual DAG run finish.
4. Wait for the scheduler to try to schedule another DAG run with the same
run id.
5. :boom:
6. Attempt to restart the scheduler.
7. :boom:
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
apache-airflow==2.3.4
apache-airflow-providers-postgres==5.2.2
### Deployment
Docker-Compose
### Deployment details
I'm using a Postgres docker container as a metadata database that is linked
via docker networking to the scheduler and the rest of the components.
Scheduler, workers and webserver are all running in separate containers (using
CeleryExecutor backed by a Redis container), though I do not think it is
relevant in this case.
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]