I like the "return task" example from Ash for Operator, if we can do that.
On Thu, 18 Jun 2026 at 15:59, Vincent Beck <[email protected]> wrote: > 100%, I actually think this syntax makes a lot of sense, by just reading > the code you understand what it does. > > On 2026/06/18 12:26:24 Ash Berlin-Taylor wrote: > > Somewhere I thought we had a list of BaseOperator init kwargs that > cannot be mapped — for instance you cannot map over the queue or pool slots > arguments today? > > > > But the rest of what TP says I agree with. If you want the result of an > operator to be marked as the dag result, you can swap to use the @dag > decorator syntax: > > > > @dag > > def my_dag(): > > task = EmptyOperator(taks_id=“xyz”) > > return task > > > > I think that is sufficient? > > > > -a > > > > > On 17 Jun 2026, at 04:19, Tzu-ping Chung via dev < > [email protected]> wrote: > > > > > > Using an operator argument such as > > > > > > task = EmptyOperator(task_id="xxx", result=True) > > > > > > would hint that you might be able to do > > > > > > task = ( > > > EmptyOperator > > > .partial(task_id="xxx") > > > .expand(result=[False, True]) > > > ) > > > > > > which I don’t think we should allow. (No, I don’t think this makes > sense in the first place, but someone somewhere might.) > > > > > > Of course, we could add checks so this emits an error when the dag is > parsed, but this is additional mental context (for both users and > maintainers) that could be entirely avoided in the first place. When AIP-52 > proposed the setup/teardown syntax (which I want to be consistent with, as > mentioned previously) also did not propose MyOperator(..., setup=True). > > > > > > Personally I don’t like > > > > > > @dag(return_task="my_task_id") > > > > > > since it would be two places to edit if you want to change the task id > in the future. With the add_task() or return syntax mentioned previously, > the handle used is a Python variable, so an incorrect name would be easy to > catch with standard linters. The error emitted would also contain better > context (line numbers etc). > > > > > > > > > TP > > > > > > > > >> On 16 Jun 2026, at 21:17, Vincent Beck <[email protected]> wrote: > > >> > > >> What about operators? Do we want to support only tasks using Taskflow > API? If not, a new parameter would work for both use cases (e.g. > `result=True`). > > >> > > >> ``` > > >> @task(result=True) > > >> def my_task(num): > > >> return num*2 > > >> ``` > > >> > > >> ``` > > >> task = EmptyOperator(task_id="xxx", result=True) > > >> ``` > > >> > > >> Or a new parameter in the Dag constructor: > > >> > > >> ``` > > >> @dag(return_task="my_task_id") > > >> ``` > > >> > > >> The inconvenience of the latter is you limit one task to be a Dag > task result (and it seems we want to enable having multiple task results > per Dag). > > >> > > >> On 2026/06/16 10:20:44 Ash Berlin-Taylor wrote: > > >>> TaskFlow automatically suffixes pretty close to this out of the box > — I think without the override we’d end up with my_task, my_task__1, > my_task__2, my_task__3 etc. > https://github.com/apache/airflow/blob/376cecdb9f258fdb6f81f264c48f281c1cd2aeb5/task-sdk/src/airflow/sdk/bases/decorator.py#L111-L150 > > >>> > > >>> -a > > >>> > > >>>> On 16 Jun 2026, at 10:59, Tzu-ping Chung via dev < > [email protected]> wrote: > > >>>> > > >>>> The loop would not work as-is (since it’d create multiple tasks > with the same id). But as currently designed, you CAN set multiple result > tasks on a dag. The result is always a dict keyed by tsk_id. So this > slightly modified example > > >>>> > > >>>> @dag > > >>>> def my_dag(): > > >>>> @task > > >>>> def t(x): > > >>>> return x > > >>>> @result > > >>>> @task > > >>>> def my_task(num): > > >>>> return num*2 > > >>>> for i in range(4): > > >>>> my_task.override(task_id=f"my_task_{i}")(t(i)) > > >>>> > > >>>> Would have the dag result > > >>>> > > >>>> { > > >>>> "my_task_0": 0, > > >>>> "my_task_1": 2, > > >>>> "my_task_2": 4, > > >>>> "my_task_3": 6, > > >>>> } > > >>>> > > >>>> > > >>>>> On 16 Jun 2026, at 17:34, Ephraim Anierobi < > [email protected]> wrote: > > >>>>> > > >>>>> Hi TP, > > >>>>> > > >>>>> Thanks for bringing up this discussion. > > >>>>> > > >>>>> I feel like `@result @task` is clean, however, it won't be clear > what the Dag's result is if the task is invoked multiple times in a dag. > > >>>>> Take for example: > > >>>>> > > >>>>> @dag > > >>>>> def my_dag(): > > >>>>> @task > > >>>>> def t(x): > > >>>>> return x > > >>>>> @result > > >>>>> @task > > >>>>> def my_task(num): > > >>>>> return num*2 > > >>>>> for i in range(4): > > >>>>> my_task(t(i)) > > >>>>> > > >>>>> Unless I'm not understanding the @result well, but I feel like > this means, every invocation of `my_task` is a result of the dag. > > >>>>> > > >>>>> If result is intended to be singular, I will prefer value > inference from the dag: > > >>>>> > > >>>>> @dag > > >>>>> def my_dag(): > > >>>>> @task > > >>>>> def my_task(): > > >>>>> return 1 > > >>>>> return my_task() > > >>>>> > > >>>>> AND > > >>>>> > > >>>>> with DAG(...) as dag: > > >>>>> output = f() > > >>>>> dag.add_result(output) > > >>>>> > > >>>>> Thanks > > >>>>> - Ephraim > > >>>>> > > >>>>> On Tue, 16 Jun 2026 at 08:37, Tzu-ping Chung via dev < > [email protected]> wrote: > > >>>>> Hi all, > > >>>>> > > >>>>> I’m currently working on the [Synchronous Dag Execution] feature > and trying to gather opinions on how the Taskflow API should work when we > want to mark a task as the dag’s “result task” (i.e. “the return value is a > final output of the dag, not an intermediate value”). > > >>>>> > > >>>>> [Synchronous Dag Execution]: > https://github.com/apache/airflow/issues/51711 > > >>>>> > > >>>>> ## Prior art (kind of) > > >>>>> > > >>>>> We currently have the setup/teardown Taskflow API like this: > > >>>>> > > >>>>> @setup > > >>>>> def f1(): ... > > >>>>> > > >>>>> @task > > >>>>> def f2(): ... > > >>>>> > > >>>>> setup1 = f1() # This is a setup task. > > >>>>> > > >>>>> t2 = f2() # This is a normal task. > > >>>>> setup2 = t2.as_setup() # This is a setup task. > > >>>>> > > >>>>> A teardown variant also exists for both cases. > > >>>>> > > >>>>> ## The decorator syntax > > >>>>> > > >>>>> The most straightforward syntax would be to have a @result > decorator on a plain Python function. However, I don’t like this since a > result task still has all the same arguments as a non-result task. Setup > and teardown tasks don’t accept most task arguments. If @result needs to > work on a plain function, it would need to duplicate and forward all the > arguments on @task. I feel we can avoid this redundancy by requiring > @result to be used ON TOP OF @task instead: > > >>>>> > > >>>>> @result > > >>>>> @task(put your arguments here...) > > >>>>> def f(): ... > > >>>>> > > >>>>> We COULD also make using @result without @task a shorthand to > argument-less calls (which is probably common?) > > >>>>> > > >>>>> # This... > > >>>>> @result > > >>>>> def f(): ... > > >>>>> > > >>>>> # Is equivalent to... > > >>>>> @result > > >>>>> @task > > >>>>> def f(): ... > > >>>>> > > >>>>> Alternatively, we could use a fluent interface: > > >>>>> > > >>>>> @task(arguments here...).result > > >>>>> def f(): ... > > >>>>> > > >>>>> Pro: avoids needing a top-level name. Con: Not a common pattern in > Airflow. > > >>>>> > > >>>>> ## The method syntax > > >>>>> > > >>>>> I don’t think adding a method similar to as_setup/teardown makes > sense here. It makes sense for setup/teardown because it allows the same > body of code to be BOTH a setup/teardown task AND a normal task at the same > time, as shown above. This does not make sense for a result task—a task > either returns the result, or it doesn’t. If we want a method-based syntax, > it makes more sense to have a method on the dag: > > >>>>> > > >>>>> with DAG(...) as dag: > > >>>>> @task > > >>>>> def f(): > > >>>>> > > >>>>> t = f() > > >>>>> dag.add_result(t) > > >>>>> > > >>>>> ## For @dag decorator > > >>>>> > > >>>>> One more syntax that only makes sense here is we can automatically > detect the return value of an @dag-decorated function: > > >>>>> > > >>>>> @dag > > >>>>> def my_dag(): > > >>>>> @task > > >>>>> def f1(): ... > > >>>>> > > >>>>> @task > > >>>>> def f2(v): ... > > >>>>> > > >>>>> result = f2(f1()) > > >>>>> > > >>>>> return result # Marks f2 as the result task! > > >>>>> > > >>>>> --------------- > > >>>>> > > >>>>> Looking forward to hearing thoughts on the above, and more ideas > on possible syntaxes. > > >>>>> > > >>>>> TP > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > --------------------------------------------------------------------- > > >>>>> To unsubscribe, e-mail: [email protected] > > >>>>> For additional commands, e-mail: [email protected] > > >>>>> > > >>>> > > >>>> > > >>>> > --------------------------------------------------------------------- > > >>>> To unsubscribe, e-mail: [email protected] > > >>>> For additional commands, e-mail: [email protected] > > >>>> > > >>> > > >>> > > >> > > >> --------------------------------------------------------------------- > > >> To unsubscribe, e-mail: [email protected] > > >> For additional commands, e-mail: [email protected] > > >> > > > > > > > > > --------------------------------------------------------------------- > > > To unsubscribe, e-mail: [email protected] > > > For additional commands, e-mail: [email protected] > > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: [email protected] > > For additional commands, e-mail: [email protected] > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
