jscheffl opened a new issue, #53610:
URL: https://github.com/apache/airflow/issues/53610

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   During tests of PR https://github.com/apache/airflow/pull/53035 it came to 
light that deferred tasks (which in case of testing HITL is) have problems in 
EdgeExecutor in case they are re-scheduled.
   
   In the tested case the task was initially scheduled on EdgeExecutor and then 
was deferred to triggerer. But the task in HITL in the test was configured to a 
timout. To process the timeout the task was scheduled a second time which 
caused a unique constraint error on edge_jobs table in the DB with the 
following error:
   ```
   root@f2921a52480b:/opt/airflow# airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   
/opt/airflow/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py:44
 DeprecationWarning: The `airflow.utils.task_group.TaskGroup` attribute is 
deprecated. Please use `'airflow.sdk.definitions.taskgroup.TaskGroup'`.
   [2025-07-21T19:30:51.013+0000] {scheduler_job_runner.py:978} INFO - Starting 
the scheduler
   [2025-07-21T19:30:51.014+0000] {executor_loader.py:269} INFO - Loaded 
executor: ::airflow.providers.edge3.executors.edge_executor.EdgeExecutor
   [2025-07-21T19:30:51.046+0000] {scheduler_job_runner.py:2137} INFO - 
Adopting or resetting orphaned tasks for active dag runs
   [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:454} INFO - 3 tasks 
up for execution:
           <TaskInstance: example_hitl_operator.wait_for_input 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.wait_for_option 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.valid_input_and_options 
manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
   [2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:526} INFO - DAG 
example_hitl_operator has 0/16 running and queued tasks
   [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG 
example_hitl_operator has 1/16 running and queued tasks
   [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG 
example_hitl_operator has 0/16 running and queued tasks
   [2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:665} INFO - Setting 
the following tasks to queued state:
           <TaskInstance: example_hitl_operator.wait_for_input 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.wait_for_option 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
           <TaskInstance: example_hitl_operator.valid_input_and_options 
manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
   [2025-07-21T19:30:51.142+0000] {scheduler_job_runner.py:750} INFO - Trying 
to enqueue tasks: [<TaskInstance: example_hitl_operator.wait_for_input 
manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: 
example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 
[scheduled]>, <TaskInstance: example_hitl_operator.valid_input_and_options 
manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>] for executor: 
EdgeExecutor(parallelism=32)
   [2025-07-21T19:30:51.149+0000] {scheduler_job_runner.py:1006} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1002, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1283, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1430, in _do_scheduling
       guard.commit()
     File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 
401, in commit
       self.session.commit()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
            ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
245, in save_obj
       _emit_insert_statements(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
1097, in _emit_insert_statements
       c = connection._execute_20(
           ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, 
state, queue, concurrency_slots, command, queued_dttm, edge_worker, 
last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, 
%(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, 
%(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
   [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 
'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 
'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 
'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2
 ... (675 characters truncated) ... 
,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 
'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command'
 : 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND
 ... (677 characters truncated) ... 
"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 
'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj
 ... (691 characters truncated) ... 
h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963
 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 
'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   [2025-07-21T19:30:51.155+0000] {edge_executor.py:357} INFO - Shutting down 
EdgeExecutor
   [2025-07-21T19:30:51.156+0000] {scheduler_job_runner.py:1018} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 10, in <module>
       sys.exit(main())
                ^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
       args.func(args)
     File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, 
in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in 
wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
52, in scheduler
       run_command_with_daemon_option(
     File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 
43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, 
in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in 
run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in 
execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1002, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1283, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", 
line 1430, in _do_scheduling
       guard.commit()
     File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 
401, in commit
       self.session.commit()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
            ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
245, in save_obj
       _emit_insert_statements(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 
1097, in _emit_insert_statements
       c = connection._execute_20(
           ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 982, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
                                        ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 
1299, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "edge_job_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index, 
try_number)=(example_hitl_operator, valid_input_and_options, 
manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
   
   [SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, 
state, queue, concurrency_slots, command, queued_dttm, edge_worker, 
last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, 
%(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, 
%(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
   [parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 
'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 
'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 
'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2
 ... (675 characters truncated) ... 
,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 
'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command'
 : 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND
 ... (677 characters truncated) ... 
"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}',
 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 
'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 
'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 
'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 
'concurrency_slots': 1, 'command': 
'{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj
 ... (691 characters truncated) ... 
h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.10963
 7+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 
'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, 
tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   ```
   Also restart of Scheduler did not fix the problem.
   
   ### What you think should happen instead?
   
   No failure. Or if Deferred tasks are in general not working on EdgeExecutor 
they should be refused to be acceped first-hand already. Better of course would 
be to handle the re-scheduling of the same tasks.
   
   ### How to reproduce
   
   Run HITL Dag (if no UI merged using PR #53035 to have a UI) and wait on 
timeout of task `valid_input_and_options` that has a timeout of 1 minute. 
Scheduler will crash
   
   ### Operating System
   
   Ununtu 24.04
   
   ### Versions of Apache Airflow Providers
   
   Edge3 from main but probably all versions of Edge Provider released will 
have this problem
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Started from main using breeze. Used command line:
   `breeze start-airflow --python 3.12 --backend postgres --executor 
EdgeExecutor --load-example-dags`
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to