jscheffl opened a new issue, #67385:
URL: https://github.com/apache/airflow/issues/67385

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.2.2rc1
   
   ### What happened and how to reproduce it?
   
   Have an error that makes scheduler crashing with 3.2.2rc1.
   
   ```
   2026-05-22T19:57:55.471027Z [info     ] Adopting or resetting orphaned tasks 
for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:2809
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1967, in _exec_single_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 952, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DatatypeMismatch: CASE types text and uuid cannot be matched
   LINE 1: ...executor.CeleryExecutor', 'CeleryExecutor')) THEN gen_random...
                                                                ^
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 6, in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
113, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 66, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 69, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/memray_utils.py",
 line 60, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 48, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
355, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
384, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1531, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1670, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1815, in _do_scheduling
       num_queued_tis = 
self._critical_section_enqueue_task_instances(session=session)
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1076, in _critical_section_enqueue_task_instances
       queued_tis = self._executable_task_instances_to_queued(max_tis, 
session=session)
                    
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 964, in _executable_task_instances_to_queued
       result = session.execute(queued_update.returning(TI.id, 
TI.external_executor_id))
                
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 2351, in execute
       return self._execute_internal(
              ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 2249, in _execute_internal
       result: Result[Any] = compile_state_cls.orm_execute_statement(
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/bulk_persistence.py",
 line 1660, in orm_execute_statement
       return super().orm_execute_statement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/context.py", 
line 306, in orm_execute_statement
       result = conn.execute(
                ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1419, in execute
       return meth(
              ^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 527, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1641, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1846, in _execute_context
       return self._exec_single_context(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1986, in _exec_single_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2363, in _handle_dbapi_exception
       raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1967, in _exec_single_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 952, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) CASE 
types text and uuid cannot be matched
   LINE 1: ...executor.CeleryExecutor', 'CeleryExecutor')) THEN gen_random...
                                                                ^
   
   [SQL: UPDATE task_instance SET state=%(state)s, queued_dttm=%(queued_dttm)s, 
queued_by_job_id=%(queued_by_job_id)s, updated_at=%(updated_at)s, 
external_executor_id=CASE WHEN (task_instance.executor IN (%(executor_1_1)s, 
%(executor_1_2)s
   )) THEN gen_random_uuid() WHEN (task_instance.executor IS NULL) THEN 
gen_random_uuid() ELSE task_instance.external_executor_id END WHERE 
task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND 
task_instance.map_i
   ndex = %(map_index_1)s AND task_instance.task_id IN (%(task_id_1_1)s) 
RETURNING task_instance.id, task_instance.external_executor_id]
   [parameters: {'state': <TaskInstanceState.QUEUED: 'queued'>, 'queued_dttm': 
datetime.datetime(2026, 5, 22, 19, 57, 55, 857339, tzinfo=Timezone('UTC')), 
'queued_by_job_id': 223, 'updated_at': datetime.datetime(2026, 5, 22, 19, 57, 
55, 85
   9683, tzinfo=Timezone('UTC')), 'dag_id_1': 'dt_flow_maintain_workers', 
'run_id_1': 'scheduled__2026-05-22T19:30:00+00:00', 'map_index_1': -1, 
'executor_1_1': 
'airflow.providers.celery.executors.celery_executor.CeleryExecutor', 'executor
   _1_2': 'CeleryExecutor', 'task_id_1_1': 'maintain_worker_queues_pools'}]
   (Background on this error at: https://sqlalche.me/e/20/f405)
   stream closed: EOF for dev-sje2lr/dt-flow-scheduler-6d45f475bb-wb2bk 
(scheduler)
   ```
   
   Backend: Postgres
   Versions used:
   - apache-airflow                            3.2.2rc1
   - apache-airflow-core                       3.2.2rc1
   - apache-airflow-providers-celery           3.19.0
   - apache-airflow-providers-edge3            3.6.0
   
   I assume this is caused by PR https://github.com/apache/airflow/pull/65711 
(back-ported from https://github.com/apache/airflow/pull/65594) - taking a look 
at the code I assume this is a bug caused by Multiple executors hitting this. 
The UUID must be converted to str() in the CASE statement.
   
   ### What you think should happen instead?
   
   Scheduler should not crash
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Apache Airflow Provider(s)
   
   celery
   
   ### Versions of Apache Airflow Providers
   
   - apache-airflow-providers-celery           3.19.0
   - apache-airflow-providers-edge3            3.6.0
   
   ### Official Helm Chart version
   
   1.19.0
   
   ### Kubernetes Version
   
   ?
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   Some local packages and patches.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to