Hello,

At our company we have DAG's which have to process lots of paged results (e.g. 
XComs results from REST endpoints or multiple files from FTP downloads).

The Xcom isn't the issue, as we use a custom provider which allows us to store 
large data on a Persistent Volume Claim without overloading the database (the 
Xcom table only holds the reference to the filepath where the XCom data is 
stored).

The naïve approach of course would be to do a partial on the operator and then 
expand the paged results, which most of the time is over 1k of tasks.
This of course can become an issue, due to the heavy amount and especially if 
you have multiple DAG's having the same problem to solve.
Also trying to cancel all those tasks can sometimes be a challenge.

At the time I already brought that issue up in an Airflow Town Hall that it 
would be nice to be able to "stream" the data instead of having them to spawn 
all tasks simultaneously before executing them.

So this time I started researching how this could be achieved using the 
existing Airflow infrastructure, and I implemented a POC which works.

Below a simplified example of a DAG using the stream functionality:

with DAG(
    "test_streamed_operator",
    default_args=DEFAULT_ARGS,
    schedule_interval=timedelta(hours=24),
    catchup=False,
) as dag:
    distinct_users_ids_task = SQLExecuteQueryOperator(
        task_id="distinct_users_ids",
        conn_id=SQL_CONN_ID,
        sql="SELECT TOP 10 ID FROM USERS",
        dag=dag,
    )

    MSGraphAsyncOperator.partial(
        task_id="registered_devices",
        conn_id=MSGRAPH_CONN_ID,
        url="users/{userId}/registeredDevices",
        do_xcom_push=False,
        dag=dag,
    ).stream(path_parameters=distinct_users_ids_task.output.map(lambda user_id: 
{"userId": user_id [0]}))

The result of the above is that the list of user id's returned by the 
SQLExecuteQueryOperator will be "streamed" to the path_parameters parameter of 
the MSGraphAsyncOperator instead of expanding it.
This will result in the "tasks" being executed sequentially instead of 
launching multiple task instances, this functionality also works with deferable 
operators like the MSGraphAsyncOperator.

Also it's possible to define a concurrency parameter (is optional), so that the 
operator can be executed concurrently, without the need to spawn multiple task 
instance as opposed to expand.
In that case the speed will depend on the number of threads available on the 
current worker, as no additional workers will be spawned, so this is pure 
concurrency with the same worker instance.
Of course if your operator is async (deferrable), then this setting won't have 
any effect.

This could be a nice feature for Airflow 3, as this would solve the scenario 
when expanding too much task instances is becoming problematic, and 
"sequential" execution is faster.
And yes I know you can play with the max_active_tasks and max_active_runs 
parameters, but this facilitates the issue I was trying to solve, especially 
regarding the sometimes unnecessary creation of dynamic tasks with all the 
overhead it also leads to.

So I'm curious about your thought about this proposition.

Kind regards,
David

Reply via email to