Hello, At our company we have DAG's which have to process lots of paged results (e.g. XComs results from REST endpoints or multiple files from FTP downloads).
The Xcom isn't the issue, as we use a custom provider which allows us to store large data on a Persistent Volume Claim without overloading the database (the Xcom table only holds the reference to the filepath where the XCom data is stored). The naïve approach of course would be to do a partial on the operator and then expand the paged results, which most of the time is over 1k of tasks. This of course can become an issue, due to the heavy amount and especially if you have multiple DAG's having the same problem to solve. Also trying to cancel all those tasks can sometimes be a challenge. At the time I already brought that issue up in an Airflow Town Hall that it would be nice to be able to "stream" the data instead of having them to spawn all tasks simultaneously before executing them. So this time I started researching how this could be achieved using the existing Airflow infrastructure, and I implemented a POC which works. Below a simplified example of a DAG using the stream functionality: with DAG( "test_streamed_operator", default_args=DEFAULT_ARGS, schedule_interval=timedelta(hours=24), catchup=False, ) as dag: distinct_users_ids_task = SQLExecuteQueryOperator( task_id="distinct_users_ids", conn_id=SQL_CONN_ID, sql="SELECT TOP 10 ID FROM USERS", dag=dag, ) MSGraphAsyncOperator.partial( task_id="registered_devices", conn_id=MSGRAPH_CONN_ID, url="users/{userId}/registeredDevices", do_xcom_push=False, dag=dag, ).stream(path_parameters=distinct_users_ids_task.output.map(lambda user_id: {"userId": user_id [0]})) The result of the above is that the list of user id's returned by the SQLExecuteQueryOperator will be "streamed" to the path_parameters parameter of the MSGraphAsyncOperator instead of expanding it. This will result in the "tasks" being executed sequentially instead of launching multiple task instances, this functionality also works with deferable operators like the MSGraphAsyncOperator. Also it's possible to define a concurrency parameter (is optional), so that the operator can be executed concurrently, without the need to spawn multiple task instance as opposed to expand. In that case the speed will depend on the number of threads available on the current worker, as no additional workers will be spawned, so this is pure concurrency with the same worker instance. Of course if your operator is async (deferrable), then this setting won't have any effect. This could be a nice feature for Airflow 3, as this would solve the scenario when expanding too much task instances is becoming problematic, and "sequential" execution is faster. And yes I know you can play with the max_active_tasks and max_active_runs parameters, but this facilitates the issue I was trying to solve, especially regarding the sometimes unnecessary creation of dynamic tasks with all the overhead it also leads to. So I'm curious about your thought about this proposition. Kind regards, David