jscheffl opened a new issue, #53610: URL: https://github.com/apache/airflow/issues/53610
### Apache Airflow version main (development) ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? During tests of PR https://github.com/apache/airflow/pull/53035 it came to light that deferred tasks (which in case of testing HITL is) have problems in EdgeExecutor in case they are re-scheduled. In the tested case the task was initially scheduled on EdgeExecutor and then was deferred to triggerer. But the task in HITL in the test was configured to a timout. To process the timeout the task was scheduled a second time which caused a unique constraint error on edge_jobs table in the DB with the following error: ``` root@f2921a52480b:/opt/airflow# airflow scheduler ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ /opt/airflow/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py:44 DeprecationWarning: The `airflow.utils.task_group.TaskGroup` attribute is deprecated. Please use `'airflow.sdk.definitions.taskgroup.TaskGroup'`. [2025-07-21T19:30:51.013+0000] {scheduler_job_runner.py:978} INFO - Starting the scheduler [2025-07-21T19:30:51.014+0000] {executor_loader.py:269} INFO - Loaded executor: ::airflow.providers.edge3.executors.edge_executor.EdgeExecutor [2025-07-21T19:30:51.046+0000] {scheduler_job_runner.py:2137} INFO - Adopting or resetting orphaned tasks for active dag runs [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:454} INFO - 3 tasks up for execution: <TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]> [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 1/16 running and queued tasks [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:665} INFO - Setting the following tasks to queued state: <TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]> <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]> [2025-07-21T19:30:51.142+0000] {scheduler_job_runner.py:750} INFO - Trying to enqueue tasks: [<TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>] for executor: EdgeExecutor(parallelism=32) [2025-07-21T19:30:51.149+0000] {scheduler_job_runner.py:1006} ERROR - Exception when executing SchedulerJob._run_scheduler_loop Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute self._run_scheduler_loop() File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling guard.commit() File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit self.session.commit() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit self._transaction.commit(_to_root=self.future) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit self._prepare_impl() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl self.session.flush() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj _emit_insert_statements( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)] [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command' : '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})] (Background on this error at: https://sqlalche.me/e/14/gkpj) [2025-07-21T19:30:51.155+0000] {edge_executor.py:357} INFO - Shutting down EdgeExecutor [2025-07-21T19:30:51.156+0000] {scheduler_job_runner.py:1018} INFO - Exited execute loop Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/bin/airflow", line 10, in <module> sys.exit(main()) ^^^^^^ File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main args.func(args) File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler run_command_with_daemon_option( File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option callback() File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda> callback=lambda: _run_scheduler_job(args), ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job run_job(job=job_runner.job, execute_callable=job_runner._execute) File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in run_job return execute_job(job, execute_callable=execute_callable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in execute_job ret = execute_callable() ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute self._run_scheduler_loop() File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling guard.commit() File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit self.session.commit() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit self._transaction.commit(_to_root=self.future) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit self._prepare_impl() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl self.session.flush() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj _emit_insert_statements( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists. [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)] [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command' : '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})] (Background on this error at: https://sqlalche.me/e/14/gkpj) ``` Also restart of Scheduler did not fix the problem. ### What you think should happen instead? No failure. Or if Deferred tasks are in general not working on EdgeExecutor they should be refused to be acceped first-hand already. Better of course would be to handle the re-scheduling of the same tasks. ### How to reproduce Run HITL Dag (if no UI merged using PR #53035 to have a UI) and wait on timeout of task `valid_input_and_options` that has a timeout of 1 minute. Scheduler will crash ### Operating System Ununtu 24.04 ### Versions of Apache Airflow Providers Edge3 from main but probably all versions of Edge Provider released will have this problem ### Deployment Other ### Deployment details Started from main using breeze. Used command line: `breeze start-airflow --python 3.12 --backend postgres --executor EdgeExecutor --load-example-dags` ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
