saulbein opened a new issue, #27818:
URL: https://github.com/apache/airflow/issues/27818

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   A user with access to manually triggering DAGs can trigger a DAG. provide a 
run_id that matches the pattern used when creating scheduled runs and cause the 
scheduler to crash due to database unique key violation:
   ```
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.8/site-packages/airflow/__main__.py", line 
38, in main
       args.func(args)
     File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", 
line 51, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
99, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
 line 76, in scheduler
       _run_scheduler_job(args=args)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
 line 46, in _run_scheduler_job
       job.run()
     File "/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py", 
line 244, in run
       self._execute()
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 
752, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 
840, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 
912, in _do_scheduling
       self._create_dagruns_for_dags(guard, session)
     File "/usr/local/lib/python3.8/site-packages/airflow/utils/retries.py", 
line 76, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File "/usr/local/lib/python3.8/site-packages/tenacity/__init__.py", line 
384, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.8/site-packages/tenacity/__init__.py", line 
351, in iter
       return fut.result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in 
__get_result
       raise self._exception
     File "/usr/local/lib/python3.8/site-packages/airflow/utils/retries.py", 
line 85, in wrapped_function
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 
980, in _create_dagruns_for_dags
       self._create_dag_runs(query.all(), session)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", 
line 2772, in all
       return self._iter().all()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", 
line 2915, in _iter
       result = self.session.execute(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 1713, in execute
       conn = self._connection_for_bind(bind)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 1552, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 721, in _connection_for_bind
       self._assert_active()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 601, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates 
unique constraint "dag_run_dag_id_run_id_key"
   DETAIL:  Key (dag_id, run_id)=(example_branch_dop_operator_v3, 
scheduled__2022-11-21T12:26:00+00:00) already exists.
   
   [SQL: INSERT INTO dag_run (dag_id, queued_at, execution_date, start_date, 
end_date, state, run_id, creating_job_id, external_trigger, run_type, conf, 
data_interval_start, data_interval_end, last_scheduling_decision, dag_hash, 
log_template_id) VALUES (%(dag_id)s, %(queued_at)s, %(execution_date)s, 
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, 
%(external_trigger)s, %(run_type)s, %(conf)s, %(data_interval_start)s, 
%(data_interval_end)s, %(last_scheduling_decision)s, %(dag_hash)s, (SELECT 
max(log_template.id) AS max_1 
   FROM log_template)) RETURNING dag_run.id]
   [parameters: {'dag_id': 'example_branch_dop_operator_v3', 'queued_at': 
datetime.datetime(2022, 11, 21, 12, 30, 20, 244878, tzinfo=Timezone('UTC')), 
'execution_date': DateTime(2022, 11, 21, 12, 26, 0, tzinfo=Timezone('UTC')), 
'start_date': None, 'end_date': None, 'state': <DagRunState.QUEUED: 'queued'>, 
'run_id': 'scheduled__2022-11-21T12:26:00+00:00', 'creating_job_id': 7, 
'external_trigger': False, 'run_type': <DagRunType.SCHEDULED: 'scheduled'>, 
'conf': <psycopg2.extensions.Binary object at 0x7fddcb174f90>, 
'data_interval_start': DateTime(2022, 11, 21, 12, 26, 0, 
tzinfo=Timezone('UTC')), 'data_interval_end': DateTime(2022, 11, 21, 12, 27, 0, 
tzinfo=Timezone('UTC')), 'last_scheduling_decision': None, 'dag_hash': 
'7aad83be6a1698b4ce6c850647434daf'}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on 
this error at: https://sqlalche.me/e/14/7s2a)
   ```
   
   Worse yet, the scheduler will keep crashing after a restart with the same 
exception.
   
   ### What you think should happen instead
   
   A user should not be able to crash the scheduler from the UI.  
   I see 2 alternatives for solving this:
   
   1. Reject custom run_id that would (or could) collide with a scheduled one, 
preventing this situation from happening.
   2. Handle the database error and assign a different run_id to the scheduled 
run.
   
   ### How to reproduce
   
   1. Find an unpaused DAG.
   2. Trigger DAG w/ config, set the run id to something like 
scheduled__2022-11-21T12:00:00+00:00 (adjust the time to be in the future where 
there is no run yet).
   3. Let the manual DAG run finish.
   4. Wait for the scheduler to try to schedule another DAG run with the same 
run id.
   5.  :boom: 
   6. Attempt to restart the scheduler.
   7. :boom: 
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.3.4
   apache-airflow-providers-postgres==5.2.2
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   I'm using a Postgres docker container as a metadata database that is linked 
via docker networking to the scheduler and the rest of the components. 
Scheduler, workers and webserver are all running in separate containers (using 
CeleryExecutor backed by a Redis container), though I do not think it is 
relevant in this case. 
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to