MatrixManAtYrService opened a new issue #19884:
URL: https://github.com/apache/airflow/issues/19884


   ### Description
   
   I wish that Airflow would look at type hints on `@task` decorated functions 
to determine:
   
   1. Do the hinted types provide serialize/deserialize functions (like `Foo`)? 
or are they callable (like `int`)? 
   
   ```python
   class Foo:
       ....
       def to_json(self) -> str:
           pass 
   
   @task
   def do_thing(some_num: int, some_obj: Foo) -> Bar:
       pass
   ```
   
   2. Do upstream tasks write `return_value`s to XCom that conflict with the 
parameter types of downstream tasks?
   
   If 1, I'd like Airflow to initialize the desired type for me
   If 2, I'd like Airflow to warn me about the type conflicts at parse time
   
   ### Use case/motivation
   
   I usually don't find it to be burdensome to manipulate jumbles of 
Tuples/Dicts/Lists.  Because of this, I don't write a lot of classes.  
   
   But I've been using the Taskflow API lately, and there's something about 
working with it that makes me want to type-hint everything that becomes an 
XComArg.  Maybe the part of my brain that used to keep track of the 
Tuple/Dict/List soup is now keeping track of whether this is task-code or 
dag-definition-code, it's hard to say.
   
   Whatever the reason, this has lead me to write dags that look something like 
this:
   
   ```python3
   from typing import List
   from dataclasses import dataclass
   from dataclasses_json import dataclass_json
   
   @dataclass_json
   @dataclass
   class Foo:
       bar: str
   
   @dataclass_json
   @dataclass
   class Baz:
       foos: List[Foo]
   
   @task
   def get_baz():
       foos = [Foo(x) for x in ["wakka", "bang"]]
       return Baz(foos).to_dict()
   
   @task
   def whats_a_baz(_baz: Baz) -> Baz:
       baz = Baz.from_dict(_baz)
       print(baz)
   
   @task
   def whats_are_bazzes(_bazzes: List[Baz]):
       bazzes = [Baz.from_dict(x) for x in _bazzes]
       print(bazzes)
   
   with DAG(
       dag_id="request",
       schedule_interval=None,
       start_date=days_ago(1),
   ) as dag:
   
       one_baz = get_baz()
       two_baz = get_baz()
   
       whats_a_baz(one_baz)
       # Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')])
   
       whats_are_bazzes([one_baz, two_baz])
       # [Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')]), 
Baz(foos=[Foo(bar='wakka'), Foo(bar='bang')])]
   ```
   
   I like this because if I'm wrong about the shape of my data in an early 
task, I notice it when that task fails to convert the data into custom objects. 
 Without these conversions, mistakes show up when they cause problems 
downstream, not where they were introduced.
   
   I dislike this because all of those to/from calls are ugly and easy to get 
wrong.
   
   This raised two questions:
     1. Since the hinted types have `from_json()` and `to_json()` functions, 
could airflow handle the conversions for me?
     2. Airflow knows which XComs are generated as task outputs, and which are 
later used as task inputs--so could it inform me of type conflicts at parse 
time? 
   
   If so, I'd be able to iterate faster since a whole category of bug would be 
catchable in a tighter debug loop (i.e. before even running the task).  
   
   Thanks for considering my feature!
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to