wolfier opened a new issue, #42485:
URL: https://github.com/apache/airflow/issues/42485

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.10.2
   
   ### What happened?
   
   The triggerer terminates when attempting to deserialize the task context 
dictionary.
   
   ```
   [2024-09-25T13:25:57.492-0500] {triggerer_job_runner.py:338} INFO - Starting 
the triggerer
   [2024-09-25T13:25:57.629-0500] {triggerer_job_runner.py:348} ERROR - 
Exception when executing TriggererJobRunner._run_trigger_loop
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 346, in _execute
       self._run_trigger_loop()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 374, in _run_trigger_loop
       self.load_triggers()
     File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 400, in load_triggers
       self.trigger_runner.update_triggers(set(ids))
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 719, in update_triggers
       new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
                                              ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 94, in kwargs
       return self._decrypt_kwargs(self.encrypted_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 130, in _decrypt_kwargs
       return BaseSerialization.deserialize(decrypted_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 831, in deserialize
       return {k: cls.deserialize(v, use_pydantic_models) for k, v in 
var.items()}
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 821, in deserialize
       d[k] = cls.deserialize(v, use_pydantic_models=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 803, in deserialize
       raise RuntimeError(
   RuntimeError: Setting use_pydantic_models = True requires AIP-44 (in 
progress) feature flag to be true. This parameter will be removed eventually 
when new serialization is used by AIP-44
   ```
   
   The issue started appearing when I upgraded from Airflow 2.9.3 to Airflow 
2.10.2. 
   
   Do note that I have a custom trigger where I am serializing the task context.
   
   ```
   class CustomTrigger(BaseTrigger):
   ...
       def serialize(self) -> tuple[str, dict[str, Any]]:
           """
           Serializes the trigger's arguments and classpath.
   
           :return: A tuple containing the classpath and a dictionary of 
arguments.
           """
           return (
               "CustomTrigger",
               {
                   "context": self.context,
               },
           )
   ```
   
   ### What you think should happen instead?
   
   I believe the deserialize operation should not be [forcing 
`use_pydantic_models` to be 
true](https://github.com/apache/airflow/blob/2.10.2/airflow/serialization/serialized_objects.py#L816-L821C43).
   
   Instead, it should be using the value passed as a parameter.
   
   ```
                   d[k] = cls.deserialize(v, use_pydantic_models)
   ```
   
   Also, when [the task context is being 
serialized](https://github.com/apache/airflow/blob/2.10.2/airflow/serialization/serialized_objects.py#L764-L769),
 it is respecting the value passed to the serialized function.
   
   ### How to reproduce
   
   1. Create a custom Trigger that serializes the task context
   2. Create a deferrable operator that uses the custom trigger
   3. Run the task
   
   ### Operating System
   
   n/a
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to