MatrixManAtYrService opened a new issue #19884:
URL: https://github.com/apache/airflow/issues/19884
### Description
I wish that Airflow would look at type hints on `@task` decorated functions
to determine:
1. Do the hinted types provide serialize/deserialize functions (like `Foo`)?
or are they callable (like `int`)?
```python
class Foo:
....
def to_json(self) -> str:
pass
@task
def do_thing(some_num: int, some_obj: Foo) -> Bar:
pass
```
2. Do upstream tasks write `return_value`s to XCom that conflict with the
parameter types of downstream tasks?
If 1, I'd like Airflow to initialize the desired type for me
If 2, I'd like Airflow to warn me about the type conflicts at parse time
### Use case/motivation
I usually don't find it to be burdensome to manipulate jumbles of
Tuples/Dicts/Lists. Because of this, I don't write a lot of classes.
But I've been using the Taskflow API lately, and there's something about
working with it that makes me want to type-hint everything that becomes an
XComArg. Maybe the part of my brain that used to keep track of the
Tuple/Dict/List soup is now keeping track of whether this is task-code or
dag-definition-code, it's hard to say.
Whatever the reason, this has lead me to write dags that look something like
this:
```python3
from typing import List
from dataclasses import dataclass
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class Foo:
bar: str
@dataclass_json
@dataclass
class Baz:
foos: List[Foo]
@task
def get_baz():
foos = [Foo(x) for x in ["wakka", "bang"]]
return Baz(foos).to_dict()
@task
def whats_a_baz(_baz: Baz) -> Baz:
baz = Baz.from_dict(_baz)
print(baz)
@task
def whats_are_bazzes(_bazzes: List[Baz]):
bazzes = [Baz.from_dict(x) for x in _bazzes]
print(bazzes)
with DAG(
dag_id="request",
schedule_interval=None,
start_date=days_ago(1),
) as dag:
one_baz = get_baz()
two_baz = get_baz()
whats_a_baz(one_baz)
# Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')])
whats_are_bazzes([one_baz, two_baz])
# [Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')]),
Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')])]
```
I like this because if I'm wrong about the shape of my data in an early
task, I notice it when that task fails to convert the data into custom objects.
Without these conversions, mistakes show up when they cause problems
downstream, not where they were introduced.
I dislike this because all of those to/from calls are ugly and easy to get
wrong.
This raised two questions:
1. Since the hinted types have `from_json()` and `to_json()` functions,
could airflow handle the conversions for me?
2. Airflow knows which XComs are generated as task outputs, and which are
later used as task inputs--so could it inform me of type conflicts at parse
time?
If so, I'd be able to iterate faster since a whole category of bug would be
catchable in a tighter debug loop (i.e. before even running the task).
Thanks for considering my feature!
### Related issues
_No response_
### Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]