jonah-prismfp opened a new issue, #49635:
URL: https://github.com/apache/airflow/issues/49635

   ### Description
   
   I'm writing a custom BaseTrigger object, and have encountered a problem when 
I pass a frozenset dtype argument. Here is an example of the layout of my object
   ```python
   import typing
   
   from airflow.exceptions import AirflowException
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   
   
   class CustomBaseTrigger(BaseTrigger):
       def __init__(self, success_state: frozenset = frozenset({"item_1"})):
           self.success_state = success_state
           self.__validate_input()
   
       def serialize(self) -> tuple[str, dict[str, typing.Any]]:
           """Serialize the trigger param and module path."""
           return ("/path/to/this/file.py", {"success_state": 
self.success_state})
   
       def __validate_input(self):
           checker = frozenset({"item_2"})
           if len(self.success_state.intersection(checker)) > 0:
               raise AirflowException("Non trivial intersection.")
   
       async def run(self) -> typing.AsyncIterator[TriggerEvent]:
           ...
   ``` 
   This object is called in a custom BaseOperator object like such:
   ```python
   from airflow.models import BaseOperator
   from airflow.triggers.base import BaseTrigger
   from airflow.utils.context import Context
   
   
   class CustomBaseOperator(BaseOperator):
   
       def __init__(self, task_id: str, **kwargs):
           super().__init__(task_id=task_id, **kwargs)
   
       def execute(self, context: Context):
           self.defer(trigger=CustomBaseTrigger(), 
method_name="execute_complete")
   
       def execute_complete(self, context: Context):
           return
   ``` 
   however this raises the following traceback error when executed
   ```shell
   Traceback (most recent call last):
   File "/airflow/jobs/triggerer_job_runner.py", line 339, in _execute
       self._run_trigger_loop()
   File "airflow/jobs/triggerer_job_runner.py", line 362, in _run_trigger_loop
       self.load_triggers()
   File "airflow/jobs/triggerer_job_runner.py", line 377, in load_triggers
       self.trigger_runner.update_triggers(set(ids))
   File "airflow/jobs/triggerer_job_runner.py", line 676, in update_triggers
       new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
   File "airflow_tools/triggers/upstream.py", line 56, in __init__
       self.__validate_input()
   File "airflow_tools/triggers/upstream.py", line 72, in __validate_input
       if len(self.success_state.intersection(checker)) > 0:
   AttributeError: 'str' object has no attribute 'intersection'
   ``` 
   
   
   It appears the process of serializing the Trigger casts the argument 
`success_state` as a string type and not a frozenset object. I'm surprised a 
basic python dtype isn't supported for these custom Triggers, however I 
appreciate I'm not a wizz at Airflow and there might be a good reason for this. 
Does anyone know if / when this is possible to resolve?
   
   ### Use case/motivation
   
   It would make the custom triggers far more flexible/adaptable if you could 
pass any type of argument to them, and not be restricted to the basic basic 
Python ones. Perhaps the serialization could be pickled and unpickled as a more 
flexible way of storing this data when caching to the trigger table in the 
database?
   
   ### Related issues
   
   I'm not sure, I should say I'm running this on Airflow 2.9.3 so this might 
have been resolved since then? Thanks for all the help!
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to