qamasailer opened a new issue, #56424:
URL: https://github.com/apache/airflow/issues/56424
### Apache Airflow version
3.1.0
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
After the upgrade to Airflow 3.1.0 the failing of one of our DAGs and its
email notification for that led to the scheduler crashing with the following
error message.
```
sys.exit(main())
^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line
55, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line
114, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 52, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 100, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
368, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
397, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1042, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1345, in _run_scheduler_loop
num_finished_events += self._process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 745, in _process_executor_events
return SchedulerJobRunner.process_executor_events(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 977, in process_executor_events
executor.send_callback(email_request)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/base_executor.py",
line 586, in send_callback
self.callback_sink.send(request)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 99, in wrapper
with create_session() as session:
^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/contextlib.py", line 144, in __exit__
next(self.gen)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 43, in create_session
session.commit()
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 1454, in commit
self._transaction.commit(_to_root=self.future)
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 832, in commit
self._prepare_impl()
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 811, in _prepare_impl
self.session.flush()
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
self._flush(objects)
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3588, in _flush
with util.safe_reraise():
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
flush_context.execute()
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
rec.execute(self)
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
util.preloaded.orm_persistence.save_obj(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
_emit_insert_statements(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 1238, in _emit_insert_statements
result = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1953, in _execute_context
self._handle_dbapi_exception(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 2134, in _handle_dbapi_exception
util.raise_(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1910, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.DataError: (psycopg2.errors.StringDataRightTruncation) value
too long for type character varying(20)
[SQL: INSERT INTO callback_request (created_at, priority_weight,
callback_data, callback_type) VALUES (%(created_at)s, %(priority_weight)s,
%(callback_data)s, %(callback_type)s) RETURNING callback_request.id]
[parameters: {'created_at': datetime.datetime(2025, 10, 6, 7, 43, 34,
612854, tzinfo=Timezone('UTC')), 'priority_weight': 1, 'callback_data':
'"{\\"filepath\\":\\"commons/esg_load.py\\",\\"bundle_name\\":\\"dags-folder\\",\\"bundle_version\\":null,\\"msg\\":\\"Executor
CeleryExecutor(paralle ... (1361 characters truncated) ...
:null,\\"next_method\\":null,\\"next_kwargs\\":null,\\"xcom_keys_to_clear\\":[],\\"should_retry\\":false},\\"type\\":\\"EmailNotificationRequest\\"}"',
'callback_type': 'EmailNotificationRequest'}]
(Background on this error at: https://sqlalche.me/e/14/9h9h)
```
### What you think should happen instead?
We made a quick-fix by changing the callback_type field to length 30,
however I think that this is not a stable and good solution and should be fixed
in airflow by either changing the field length in the models or by changing the
value for this callback_type to something below 20 characters.
### How to reproduce
I tried reproducing, but I cannot get it to fail in the running system again.
I can understand why its happening by going through the code:
https://github.com/apache/airflow/blob/cc1a0007eb20c6c29f420e8f8867f2eaf56de787/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L977
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/executors/base_executor.py#L586
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/callbacks/database_callback_sink.py#L38
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/db_callback_request.py#L48
uses the class name which is too long for the field.
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]