ashb commented on a change in pull request #15389: URL: https://github.com/apache/airflow/pull/15389#discussion_r677399435
########## File path: airflow/jobs/triggerer_job.py ########## @@ -0,0 +1,410 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import asyncio +import os +import signal +import sys +import threading +import time +from collections import deque +from typing import Deque, Dict, List, Optional, Set, Tuple, Type + +from airflow.compat.asyncio import create_task +from airflow.jobs.base_job import BaseJob +from airflow.models.trigger import Trigger +from airflow.triggers.base import BaseTrigger, TriggerEvent +from airflow.typing_compat import TypedDict +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.module_loading import import_string + + +class TriggererJob(BaseJob): + """ + TriggererJob continuously runs active triggers in asyncio, watching + for them to fire off their events and then dispatching that information + to their dependent tasks/DAGs. + + It runs as two threads: + - The main thread does DB calls/checkins + - A subthread runs all the async code + """ + + __mapper_args__ = {'polymorphic_identity': 'TriggererJob'} + + partition_ids: Optional[List[int]] = None + partition_total: Optional[int] = None + + def __init__(self, partition=None, *args, **kwargs): + # Make sure we can actually run + if not hasattr(asyncio, "create_task"): + raise RuntimeError("The triggerer/deferred operators only work on Python 3.7 and above.") Review comment: This will be handled already by the `from airflow.compat.asyncio import create_task` on L 27 ########## File path: airflow/models/taskinstance.py ########## @@ -1182,6 +1228,22 @@ def _run_raw_task( self._prepare_and_execute_task_with_callbacks(context, task) self.refresh_from_db(lock_for_update=True) self.state = State.SUCCESS + except TaskDeferred as defer: + # The task has signalled it wants to defer execution based on + # a trigger. + self._defer_task(defer=defer) + self.log.info( + 'Pausing task as DEFERRED. ' 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s', Review comment: ```suggestion 'Pausing task as DEFERRED. dag_id=%s, task_id=%s, execution_date=%s, start_date=%s', ``` ########## File path: airflow/models/trigger.py ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +from typing import Any, Dict, List, Optional + +from sqlalchemy import BigInteger, Column, String, func + +from airflow.models.base import Base +from airflow.models.taskinstance import TaskInstance +from airflow.triggers.base import BaseTrigger +from airflow.utils import timezone +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.state import State + + +class Trigger(Base): + """ + Triggers are a workload that run in an asynchronous event loop shared with + other Triggers, and fire off events that will unpause deferred Tasks, + start linked DAGs, etc. + + They are persisted into the database and then re-hydrated into a single + "triggerer" process, where they're all run at once. We model it so that + there is a many-to-one relationship between Task and Trigger, for future + deduplication logic to use. + + Rows will be evicted from the database when the triggerer detects no + active Tasks/DAGs using them. Events are not stored in the database; + when an Event is fired, the triggerer will directly push its data to the + appropriate Task/DAG. + """ + + __tablename__ = "trigger" + + id = Column(BigInteger, primary_key=True) + classpath = Column(String(1000), nullable=False) + kwargs = Column(ExtendedJSON, nullable=False) + created_date = Column(UtcDateTime, nullable=False) + triggerer_id = Column(BigInteger, nullable=True) + + def __init__( + self, classpath: str, kwargs: Dict[str, Any], created_date: Optional[datetime.datetime] = None + ): + super().__init__() + self.classpath = classpath + self.kwargs = kwargs + self.created_date = created_date or timezone.utcnow() + + @classmethod + def from_object(cls, trigger: BaseTrigger): + """ + Alternative constructor that creates a trigger row based directly + off of a Trigger object. + """ + classpath, kwargs = trigger.serialize() + return cls(classpath=classpath, kwargs=kwargs) + + @classmethod + @provide_session + def bulk_fetch(cls, ids: List[int], session=None) -> Dict[int, "Trigger"]: + """ + Fetches all of the Triggers by ID and returns a dict mapping + ID -> Trigger instance + """ + return {obj.id: obj for obj in session.query(cls).filter(cls.id.in_(ids)).all()} + + @classmethod + @provide_session + def clean_unused(cls, session=None): + """ + Deletes all triggers that have no tasks/DAGs dependent on them + (triggers have a one-to-many relationship to both) + """ + # Update all task instances with trigger IDs that are not DEFERRED to remove them + session.query(TaskInstance).filter( + TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None) + ).update({TaskInstance.trigger_id: None}) + # Get all triggers that have no task instances depending on them... + ids = [ + trigger_id + for (trigger_id,) in ( + session.query(cls.id) + .join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True) + .group_by(cls.id) + .having(func.count(TaskInstance.trigger_id) == 0) + ) + ] + # ...and delete them (we can't do this in one query due to MySQL) + session.query(Trigger).filter(Trigger.id.in_(ids)).delete(synchronize_session=False) + + @classmethod + @provide_session + def submit_event(cls, trigger_id, event, session=None): + """ + Takes an event from an instance of itself, and triggers all dependent + tasks to resume. + """ + for task_instance in session.query(TaskInstance).filter( + TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED + ): + # Add the event's payload into the kwargs for the task + next_kwargs = task_instance.next_kwargs or {} + next_kwargs["event"] = event.payload + task_instance.next_kwargs = next_kwargs + # Remove ourselves as its trigger + task_instance.trigger_id = None + # Finally, mark it as scheduled so it gets re-queued + task_instance.state = State.SCHEDULED + + @classmethod + @provide_session + def submit_failure(cls, trigger_id, session=None): + """ + Called when a trigger has failed unexpectedly, and we need to mark + everything that depended on it as failed. Notably, we have to actually + run the failure code from a worker as it may have linked callbacks, so + hilariously we have to re-schedule the task instances to a worker just + so they can then fail. + + We use a special __fail__ value for next_method to achieve this that + the runtime code understands as immediate-fail, and pack the error into + next_kwargs. + + TODO: Once we have shifted callback (and email) handling to run on + workers as first-class concepts, we can run the failure code here + in-process, but we can't do that right now. + """ + for task_instance in session.query(TaskInstance).filter( + TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED + ): + # Add the error and set the next_method to the fail state + task_instance.next_method = "__fail__" + task_instance.next_kwargs = {"error": "Trigger failure"} + # Remove ourselves as its trigger + task_instance.trigger_id = None + # Finally, mark it as scheduled so it gets re-queued + task_instance.state = State.SCHEDULED + + @classmethod + @provide_session + def ids_for_triggerer(cls, triggerer_id, session=None): + """Retrieves a list of triggerer_ids.""" + return [row[0] for row in session.query(cls.id).filter(cls.triggerer_id == triggerer_id)] + + @classmethod + @provide_session + def assign_unassigned(cls, triggerer_id, capacity, session=None): + """ + Takes a triggerer_id and the capacity for that triggerer and assigns unassigned + triggers until that capacity is reached, or there are no more unassigned triggers. + """ + from airflow.jobs.base_job import BaseJob # To avoid circular import + + count = session.query(cls.id).filter(cls.triggerer_id == triggerer_id).count() + capacity -= count + + if capacity <= 0: + return + + alive_triggerer_ids = [ + row[0] + for row in session.query(BaseJob.id).filter( + BaseJob.end_date is None, + BaseJob.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=30), + BaseJob.job_type == "TriggererJob", + ) + ] + + # find triggers which have a triggerer_id not in list + trigger_ids = [ + row[0] + for row in session.query(cls.id) + .filter(cls.triggerer_id.notin_(alive_triggerer_ids)) + .limit(capacity) + ] + + session.query(cls).filter(cls.id.in_(trigger_ids)).update( + {cls.triggerer_id: triggerer_id}, + synchronize_session=False, + ) Review comment: By splitting this in to two queries this feels a bit racy to me. Either we should - Combine in to a single SQL query - Use `FOR UPDATE` locking, or - Extend the conditional on the update so that it only updates rows where the trigger_id is still the "old" value (None, or not in alive_triggerer_ids). ########## File path: docs/apache-airflow/concepts/deferring.rst ########## @@ -0,0 +1,174 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Deferrable Operators & Triggers +=============================== + +Standard :doc:`Operators <operators>` and :doc:`Sensors <sensors>` take up a full *worker slot* for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that's currently running but idle, then you *cannot run anything else* - even though your entire Airflow cluster is essentially idle. ``reschedule`` mode for Sensors solves some of this, allowing Sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not anything else. + +This is where *Deferrable Operators* come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a *Trigger*. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors. + +*Triggers* are small, asynchronous pieces of Python code designed to be run all together in a single Python process; because they are asynchronous, they are able to all co-exist efficiently. As an overview of how this process works: + +* A task instance (running operator) gets to a point where it has to wait, and defers itself with a trigger tied to the event that should resume it. This frees up the worker to run something else. +* The new Trigger instance is registered inside Airflow, and picked up by one or more *triggerer* processes +* The trigger is run until it fires, at which point its source task is re-scheduled +* The scheduler queues the task to resume on a worker node + +Using deferrable operators as a DAG author is almost transparent; writing them, however, takes a bit more work. + +.. note:: + + Deferrable Operators & Triggers rely on more recent ``asyncio`` features, and as a result only work + on Python 3.7 or higher. + + +Using Deferrable Operators +-------------------------- + +If all you wish to do is use pre-written Deferrable Operators (such as ``TimeSensorAsync``, which comes with Airflow), then there are only two steps you need: + +* Ensure your Airflow installation is running at least one ``triggerer`` process, as well as the normal ``scheduler`` +* Use deferrable operators/sensors in your DAGs + +That's it; everything else will be automatically handled for you. If you're upgrading existing DAGs, we even provide some API-compatible sensor variants (e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG with no other changes required. + +Note that you cannot yet use the deferral ability from inside custom PythonOperator/TaskFlow Python functions; it is only available to pre-built Operators at the moment. + + +Writing Deferrable Operators +---------------------------- + +Writing a deferrable operator takes a bit more work. There are some main points to consider: + +* Your Operator must defer itself based on a Trigger. If there is a Trigger in core Airflow you can use, great; otherwise, you will have to write one. +* Your Operator will be stopped and removed from its worker while deferred, and no state will persist automatically. You can persist state by asking Airflow to resume you at a certain method or pass certain kwargs, but that's it. +* You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control. +* Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors. + + +Triggering Deferral +~~~~~~~~~~~~~~~~~~~ + +If you want to trigger deferral, at any place in your Operator you can call ``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a special exception that Airflow will catch. The arguments are: + +* ``trigger``: An instance of a Trigger that you wish to defer on. It will be serialized into the database. +* ``method_name``: The method name on your Operator you want Airflow to call when it resumes, other than ``execute``. +* ``kwargs``: Additional keyword arguments to pass to the method when it is called. Optional, defaults to ``{}``. +* ``timeout``: A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Optional, defaults to ``None``, meaning no timeout. + +When you opt to defer, your Operator will *stop executing at that point and be removed from its current worker*. No state will persist, and when your Operator is resumed it will be a *brand new instance* of it. The only way you can pass state from the old instance of the Operator to the new one is via ``method_name`` and ``kwargs``. Review comment: > No state will persist could be a little more explicit. How about > No state such as local variables or attributes set on ``self`` will persist ########## File path: airflow/models/trigger.py ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +from typing import Any, Dict, List, Optional + +from sqlalchemy import BigInteger, Column, String, func + +from airflow.models.base import Base +from airflow.models.taskinstance import TaskInstance +from airflow.triggers.base import BaseTrigger +from airflow.utils import timezone +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.state import State + + +class Trigger(Base): + """ + Triggers are a workload that run in an asynchronous event loop shared with + other Triggers, and fire off events that will unpause deferred Tasks, + start linked DAGs, etc. + + They are persisted into the database and then re-hydrated into a single + "triggerer" process, where they're all run at once. We model it so that Review comment: This isn't quite true anymore is it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
