jonah-prismfp opened a new issue, #49635:
URL: https://github.com/apache/airflow/issues/49635
### Description
I'm writing a custom BaseTrigger object, and have encountered a problem when
I pass a frozenset dtype argument. Here is an example of the layout of my object
```python
import typing
from airflow.exceptions import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent
class CustomBaseTrigger(BaseTrigger):
def __init__(self, success_state: frozenset = frozenset({"item_1"})):
self.success_state = success_state
self.__validate_input()
def serialize(self) -> tuple[str, dict[str, typing.Any]]:
"""Serialize the trigger param and module path."""
return ("/path/to/this/file.py", {"success_state":
self.success_state})
def __validate_input(self):
checker = frozenset({"item_2"})
if len(self.success_state.intersection(checker)) > 0:
raise AirflowException("Non trivial intersection.")
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
...
```
This object is called in a custom BaseOperator object like such:
```python
from airflow.models import BaseOperator
from airflow.triggers.base import BaseTrigger
from airflow.utils.context import Context
class CustomBaseOperator(BaseOperator):
def __init__(self, task_id: str, **kwargs):
super().__init__(task_id=task_id, **kwargs)
def execute(self, context: Context):
self.defer(trigger=CustomBaseTrigger(),
method_name="execute_complete")
def execute_complete(self, context: Context):
return
```
however this raises the following traceback error when executed
```shell
Traceback (most recent call last):
File "/airflow/jobs/triggerer_job_runner.py", line 339, in _execute
self._run_trigger_loop()
File "airflow/jobs/triggerer_job_runner.py", line 362, in _run_trigger_loop
self.load_triggers()
File "airflow/jobs/triggerer_job_runner.py", line 377, in load_triggers
self.trigger_runner.update_triggers(set(ids))
File "airflow/jobs/triggerer_job_runner.py", line 676, in update_triggers
new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
File "airflow_tools/triggers/upstream.py", line 56, in __init__
self.__validate_input()
File "airflow_tools/triggers/upstream.py", line 72, in __validate_input
if len(self.success_state.intersection(checker)) > 0:
AttributeError: 'str' object has no attribute 'intersection'
```
It appears the process of serializing the Trigger casts the argument
`success_state` as a string type and not a frozenset object. I'm surprised a
basic python dtype isn't supported for these custom Triggers, however I
appreciate I'm not a wizz at Airflow and there might be a good reason for this.
Does anyone know if / when this is possible to resolve?
### Use case/motivation
It would make the custom triggers far more flexible/adaptable if you could
pass any type of argument to them, and not be restricted to the basic basic
Python ones. Perhaps the serialization could be pickled and unpickled as a more
flexible way of storing this data when caching to the trigger table in the
database?
### Related issues
I'm not sure, I should say I'm running this on Airflow 2.9.3 so this might
have been resolved since then? Thanks for all the help!
### Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]