andrewgodwin commented on a change in pull request #15389: URL: https://github.com/apache/airflow/pull/15389#discussion_r638995366
########## File path: docs/apache-airflow/concepts/deferring.rst ########## @@ -0,0 +1,172 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Deferrable Operators & Triggers +=============================== + +Standard :doc:`Operators <operators>` and :doc:`Sensors <sensors>` take up a full *worker slot* for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that's currently running but idle, then you *cannot run anything else* - even though your entire Airflow cluster is essentially idle. + +This is where *Deferrable Operators* come in. A deferrable operator is one that is written with the ability to suspend itself and remove itself from the worker when it knows that it will have to wait, and hand off the job of resuming it to something called a *Trigger*. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors. + +*Triggers* are small, asynchronous pieces of Python code designed to be run all together in a single Python process; because they are asynchronous, they are able to all co-exist efficiently. As an overview of how this process works: + +* A task instance (running operator) gets to a point where it has to wait, and defers itself with a trigger tied to the event that should resume it. It then removes itself from its current worker and frees up space. +* The new Trigger instance is registered inside Airflow, and picked up by one or more *triggerer* processes +* The trigger is run until it fires, at which point its source task is re-scheduled +* The task instance resumes + +Using deferrable operators as a DAG author is almost transparent; writing them, however, takes a bit more work. + +.. note:: + + Deferrable Operators & Triggers rely on more recent ``asyncio`` features, and as a result only work + on Python 3.7 or higher. + + +Using Deferrable Operators +-------------------------- + +If all you wish to do is use pre-written Deferrable Operators (such as ``TimeSensorAsync``, which comes with Airflow), then there are only two steps you need: + +* Ensure your Airflow installation is running at least one *triggerer* process, as well as the normal *scheduler* +* Use deferrable operators/sensors in your DAGs + +That's it; everything else will be automatically handled for you. If you're upgrading existing DAGs, we even provide some API-compatible sensor variants (e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG with no other changes required. + +Note that you cannot yet use the deferral ability from inside custom PythonOperator/TaskFlow code; it is only available to pre-built Operators at the moment. + + +Writing Deferrable Operators +---------------------------- + +Writing a deferrable operator takes a bit more work. There are some main points to consider: + +* Your Operator must defer itself based on a Trigger. If there is a Trigger in core Airflow you can use, great; otherwise, you will have to write one. +* Your Operator will be deleted and removed from its worker while deferred, and no state will persist automatically. You can persist state by asking Airflow to resume you at a certain method or pass certain kwargs, but that's it. +* You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control. +* Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors. + + +Triggering Deferral +~~~~~~~~~~~~~~~~~~~ + +If you want to trigger deferral, at any place in your Operator you can call ``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a special exception that Airflow will catch. The arguments are: + +* ``trigger``: An instance of a Trigger that you wish to defer on. It will be serialized into the database. +* ``method_name``: The method name on your Operator you want Airflow to call when it resumes, other than ``execute``. +* ``kwargs``: Additional keyword arguments to pass to the method when it is called. Optional, defaults to ``{}``. +* ``timeout``: A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Optional, defaults to ``None``, meaning no timeout. Review comment: I think that is how I would interpret it too, and I suspect a bit of work is needed on that front. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
