wolfier opened a new issue, #42485:
URL: https://github.com/apache/airflow/issues/42485
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.10.2
### What happened?
The triggerer terminates when attempting to deserialize the task context
dictionary.
```
[2024-09-25T13:25:57.492-0500] {triggerer_job_runner.py:338} INFO - Starting
the triggerer
[2024-09-25T13:25:57.629-0500] {triggerer_job_runner.py:348} ERROR -
Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 346, in _execute
self._run_trigger_loop()
File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 374, in _run_trigger_loop
self.load_triggers()
File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 400, in load_triggers
self.trigger_runner.update_triggers(set(ids))
File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 719, in update_triggers
new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py",
line 94, in kwargs
return self._decrypt_kwargs(self.encrypted_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py",
line 130, in _decrypt_kwargs
return BaseSerialization.deserialize(decrypted_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 831, in deserialize
return {k: cls.deserialize(v, use_pydantic_models) for k, v in
var.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 821, in deserialize
d[k] = cls.deserialize(v, use_pydantic_models=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 803, in deserialize
raise RuntimeError(
RuntimeError: Setting use_pydantic_models = True requires AIP-44 (in
progress) feature flag to be true. This parameter will be removed eventually
when new serialization is used by AIP-44
```
The issue started appearing when I upgraded from Airflow 2.9.3 to Airflow
2.10.2.
Do note that I have a custom trigger where I am serializing the task context.
```
class CustomTrigger(BaseTrigger):
...
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serializes the trigger's arguments and classpath.
:return: A tuple containing the classpath and a dictionary of
arguments.
"""
return (
"CustomTrigger",
{
"context": self.context,
},
)
```
### What you think should happen instead?
I believe the deserialize operation should not be [forcing
`use_pydantic_models` to be
true](https://github.com/apache/airflow/blob/2.10.2/airflow/serialization/serialized_objects.py#L816-L821C43).
Instead, it should be using the value passed as a parameter.
```
d[k] = cls.deserialize(v, use_pydantic_models)
```
Also, when [the task context is being
serialized](https://github.com/apache/airflow/blob/2.10.2/airflow/serialization/serialized_objects.py#L764-L769),
it is respecting the value passed to the serialized function.
### How to reproduce
1. Create a custom Trigger that serializes the task context
2. Create a deferrable operator that uses the custom trigger
3. Run the task
### Operating System
n/a
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]