kaxil commented on code in PR #63500: URL: https://github.com/apache/airflow/pull/63500#discussion_r2934450538
########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 Review Comment: The leading space before `.. versionadded::` will cause Sphinx to treat this as a block quote instead of a directive. It won't render the version badge. ```suggestion .. versionadded:: 3.2.0 ``` ########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 + +Airflow contains Python native async support, enabling task authors to leverage asynchronous I/O for high-throughput workloads. +It is important to understand how this differs from deferred operators. + +Deferred Operators +------------------ + +A deferred operator is an operator that can pause its execution until an external trigger event occurs, +without holding a worker slot. For more details see :doc:`authoring-and-scheduling/deferring`. Review Comment: This `:doc:` reference points to `authoring-and-scheduling/deferring`, which lives in `airflow-core/docs/`, not `task-sdk/docs/`. It will fail during the doc build. Since the task-sdk Sphinx config has intersphinx mapping to the core docs as `"airflow"`, this should work: ``` :doc:`airflow:authoring-and-scheduling/deferring` ``` Alternatively, a direct URL to the published docs would also work. ########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 + +Airflow contains Python native async support, enabling task authors to leverage asynchronous I/O for high-throughput workloads. +It is important to understand how this differs from deferred operators. + +Deferred Operators +------------------ + +A deferred operator is an operator that can pause its execution until an external trigger event occurs, +without holding a worker slot. For more details see :doc:`authoring-and-scheduling/deferring`. +Examples include the HttpOperator in deferrable mode, sensors or operators integrated with triggers. + +Key characteristics: + + - Execution is paused while waiting for external events or resources. + - Worker slots are freed during the wait, improving resource efficiency. + - Ideal for scenarios where a single external event or a small number of events dictate task completion. + +Typically simpler to use, as no custom async logic is required as this is all handled by the deferred operator. + +Async Python Operators +---------------------- + +Python native async operators allow you to write tasks that leverage Python's asyncio: + + - Tasks can perform many concurrent I/O operations efficiently within a single worker slot sharing the same event loop. + - Task code uses async/await syntax with async-compatible hooks, such as HttpAsyncHook or the SFTPHookAsync. Review Comment: Two things: 1. **Missing `BaseAsyncOperator`**: This section only covers `@task async def` (taskflow API). Users writing class-based operators should know about `BaseAsyncOperator` (defined in `airflow.sdk`). At minimum a mention like: "For class-based operators, subclass `BaseAsyncOperator`." 2. **"sharing the same event loop"** (line 48): Each task instance gets its own event loop (via `asyncio.run()`). The concurrency is *within* that single task's loop, not shared across task instances. "sharing the same event loop" could mislead users into thinking multiple tasks share one loop. Consider: "using a single event loop within the task." ########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 + +Airflow contains Python native async support, enabling task authors to leverage asynchronous I/O for high-throughput workloads. +It is important to understand how this differs from deferred operators. + +Deferred Operators +------------------ + +A deferred operator is an operator that can pause its execution until an external trigger event occurs, +without holding a worker slot. For more details see :doc:`authoring-and-scheduling/deferring`. +Examples include the HttpOperator in deferrable mode, sensors or operators integrated with triggers. + +Key characteristics: + + - Execution is paused while waiting for external events or resources. + - Worker slots are freed during the wait, improving resource efficiency. + - Ideal for scenarios where a single external event or a small number of events dictate task completion. + +Typically simpler to use, as no custom async logic is required as this is all handled by the deferred operator. + +Async Python Operators +---------------------- + +Python native async operators allow you to write tasks that leverage Python's asyncio: + + - Tasks can perform many concurrent I/O operations efficiently within a single worker slot sharing the same event loop. + - Task code uses async/await syntax with async-compatible hooks, such as HttpAsyncHook or the SFTPHookAsync. + +Ideal when you need to perform high-throughput operations (e.g., many HTTP requests, database calls, or API interactions) within a single task instance, +or when there is no deferred operator available but there is an async hook available. + +When to Use Deferred Operators +------------------------------ + +Prefer a deferred operator when: + + - There is an existing deferrable operator that covers your use case (e.g., HttpOperator deferrable mode). + - The task waits for a single or limited external events. + - You want to free worker resources while waiting for triggers. + - You don't need to loop over the same operator multiple times (e.g. multiplexing). + +.. code-block:: python + + from airflow.providers.http.operators.http import HttpOperator + + task_get_op = HttpOperator( + http_conn_id="http_conn_id", + task_id="get_op", + method="GET", + endpoint="get", + data={"param1": "value1", "param2": "value2"}, + deferrable=True, + ) + +When to Use Async Python Operators +---------------------------------- + +Use async Python operators when: + + - The task needs to perform many concurrent requests or operations within a single task. + - You want to take advantage of the shared event loop to improve throughput. + - There is simply no deferred operator available. + - The logic depends on custom Python code (e.g. callables or lambdas) that cannot easily be implemented in a trigger, since triggers must be serializable and do not have access to DAG code at runtime. + +.. code-block:: python + + import asyncio + from aiohttp import ClientSession + from airflow.providers.http.hooks.http import HttpAsyncHook + from airflow.sdk import task + + parameters = [ + {"param1": "value1", "param2": "value2"}, + {"param1": "value3", "param2": "value4"}, + {"param1": "value5", "param2": "value6"}, + {"param1": "value7", "param2": "value8"}, + ] + + @task + async def get_op(parameters: list[dict[str, str]]): + hook = HttpAsyncHook(http_conn_id="http_conn_id", method="GET") + + async with ClientSession() as session: + tasks = [ + hook.run(session=session, endpoint="get", data=params) + for params in parameters + ] + # Run all requests concurrently in the shared event loop for high throughput + responses = await asyncio.gather(*tasks) + return [await r.json() for r in responses] + + get_op(parameters) Review Comment: A few issues with this example: 1. **Missing DAG context**: The example calls `get_op(parameters)` at line 114 but never shows a `@dag`-decorated function. Every other example in the task-sdk docs shows a complete, runnable DAG. A user copying this won't get a working DAG file. 2. **Undeclared `aiohttp` dependency**: `from aiohttp import ClientSession` — nothing tells the user they need this package installed. Either add a note ("requires `aiohttp`, included with the HTTP provider") or restructure to use only the hook's session management. 3. **Complexity jump**: The deferred example is 6 lines; this is 20+ with `ClientSession`, `asyncio.gather`, and nested comprehensions. This unintentionally signals "async = hard." Consider showing a simple async example first (single call), then this multiplexing example as an advanced follow-up. 4. **No guidance on limits**: Real use cases may fire hundreds of concurrent requests. A brief note about using semaphores or batching to avoid overwhelming downstream services would help. ########## task-sdk/docs/index.rst: ########## @@ -160,5 +160,6 @@ For the full public API reference, see the :doc:`api` page. examples dynamic-task-mapping + deferred-vs-async-operators Review Comment: The new page is in the toctree but isn't mentioned in any of the numbered sections (1–6) on the index page. Users browsing the index linearly won't discover it. Consider adding a brief mention under Section 2 (DAG Authoring) — something like "For choosing between deferred and async approaches, see :doc:`deferred-vs-async-operators`." ########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 + +Airflow contains Python native async support, enabling task authors to leverage asynchronous I/O for high-throughput workloads. +It is important to understand how this differs from deferred operators. Review Comment: A user landing here needs a quick mental model of *how* each approach works before choosing between them. Consider replacing these two lines with something like: > Airflow 3.2 adds Python-native async support for tasks, allowing concurrent I/O within a single worker slot. This page explains how async operators differ from deferred operators and when to use each. The "It is important to understand" phrasing is filler — just state the comparison directly. ########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 + +Airflow contains Python native async support, enabling task authors to leverage asynchronous I/O for high-throughput workloads. +It is important to understand how this differs from deferred operators. + +Deferred Operators +------------------ + +A deferred operator is an operator that can pause its execution until an external trigger event occurs, +without holding a worker slot. For more details see :doc:`authoring-and-scheduling/deferring`. +Examples include the HttpOperator in deferrable mode, sensors or operators integrated with triggers. + +Key characteristics: + + - Execution is paused while waiting for external events or resources. + - Worker slots are freed during the wait, improving resource efficiency. + - Ideal for scenarios where a single external event or a small number of events dictate task completion. + +Typically simpler to use, as no custom async logic is required as this is all handled by the deferred operator. Review Comment: This sentence is structurally orphaned — it sits outside the bullet list but reads like it should be the closing bullet. A reader skimming the bullets will miss it entirely. Either make it the last bullet: ```rst - Typically simpler to use, as the deferred operator handles all async logic. ``` Or separate it clearly as its own paragraph with more context. ########## task-sdk/docs/deferred-vs-async-operators.rst: ########## @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _sdk-deferred-vs-async-operators: + +Deferred vs Async Operators +=========================== + + .. versionadded:: 3.2.0 + +Airflow contains Python native async support, enabling task authors to leverage asynchronous I/O for high-throughput workloads. +It is important to understand how this differs from deferred operators. + +Deferred Operators +------------------ + +A deferred operator is an operator that can pause its execution until an external trigger event occurs, +without holding a worker slot. For more details see :doc:`authoring-and-scheduling/deferring`. +Examples include the HttpOperator in deferrable mode, sensors or operators integrated with triggers. + +Key characteristics: + + - Execution is paused while waiting for external events or resources. + - Worker slots are freed during the wait, improving resource efficiency. + - Ideal for scenarios where a single external event or a small number of events dictate task completion. + +Typically simpler to use, as no custom async logic is required as this is all handled by the deferred operator. + +Async Python Operators +---------------------- + +Python native async operators allow you to write tasks that leverage Python's asyncio: + + - Tasks can perform many concurrent I/O operations efficiently within a single worker slot sharing the same event loop. + - Task code uses async/await syntax with async-compatible hooks, such as HttpAsyncHook or the SFTPHookAsync. + +Ideal when you need to perform high-throughput operations (e.g., many HTTP requests, database calls, or API interactions) within a single task instance, +or when there is no deferred operator available but there is an async hook available. + +When to Use Deferred Operators +------------------------------ + +Prefer a deferred operator when: + + - There is an existing deferrable operator that covers your use case (e.g., HttpOperator deferrable mode). + - The task waits for a single or limited external events. + - You want to free worker resources while waiting for triggers. + - You don't need to loop over the same operator multiple times (e.g. multiplexing). + +.. code-block:: python + + from airflow.providers.http.operators.http import HttpOperator + + task_get_op = HttpOperator( + http_conn_id="http_conn_id", + task_id="get_op", + method="GET", + endpoint="get", + data={"param1": "value1", "param2": "value2"}, + deferrable=True, + ) + +When to Use Async Python Operators +---------------------------------- + +Use async Python operators when: + + - The task needs to perform many concurrent requests or operations within a single task. + - You want to take advantage of the shared event loop to improve throughput. + - There is simply no deferred operator available. + - The logic depends on custom Python code (e.g. callables or lambdas) that cannot easily be implemented in a trigger, since triggers must be serializable and do not have access to DAG code at runtime. + +.. code-block:: python + + import asyncio + from aiohttp import ClientSession + from airflow.providers.http.hooks.http import HttpAsyncHook + from airflow.sdk import task + + parameters = [ + {"param1": "value1", "param2": "value2"}, + {"param1": "value3", "param2": "value4"}, + {"param1": "value5", "param2": "value6"}, + {"param1": "value7", "param2": "value8"}, + ] + + @task + async def get_op(parameters: list[dict[str, str]]): + hook = HttpAsyncHook(http_conn_id="http_conn_id", method="GET") + + async with ClientSession() as session: + tasks = [ + hook.run(session=session, endpoint="get", data=params) + for params in parameters + ] + # Run all requests concurrently in the shared event loop for high throughput + responses = await asyncio.gather(*tasks) + return [await r.json() for r in responses] + + get_op(parameters) + +Compared to Dynamic Task Mapping with many deferrable operators, the approach with Async Python Operator is that all execution shares one worker slot and is sharing a single event loop. +In contrast with Dynamic Task Mapping each list element is tracked as an individual task, needs individual scheduling but on the other hand can be repeated individually. +For more details about Dynamic Task Mapping, see the :ref:`dynamic task mapping <sdk-dynamic-mapping>` page. Review Comment: Two issues here: 1. **Broken `:ref:`**: `sdk-dynamic-mapping` should be `sdk-dynamic-task-mapping` (the label defined in `dynamic-task-mapping.rst` line 18). This will produce a Sphinx warning and a dead link. 2. **Content**: This comparison deserves more structure — it's one of the most valuable parts of the page. A table would be much more scannable: ```rst .. list-table:: :header-rows: 1 * - Aspect - Async ``@task`` - Dynamic Task Mapping (deferrable) * - Worker slots - 1 (shared event loop) - N (one per mapped instance) * - Individual retry - No — whole task retries - Yes — per mapped instance * - UI visibility - Single task - N tasks visible individually * - Scheduling overhead - None - Scheduler must schedule N instances ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
