ashb commented on a change in pull request #15389:
URL: https://github.com/apache/airflow/pull/15389#discussion_r677399435



##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -0,0 +1,410 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import os
+import signal
+import sys
+import threading
+import time
+from collections import deque
+from typing import Deque, Dict, List, Optional, Set, Tuple, Type
+
+from airflow.compat.asyncio import create_task
+from airflow.jobs.base_job import BaseJob
+from airflow.models.trigger import Trigger
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.typing_compat import TypedDict
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.module_loading import import_string
+
+
+class TriggererJob(BaseJob):
+    """
+    TriggererJob continuously runs active triggers in asyncio, watching
+    for them to fire off their events and then dispatching that information
+    to their dependent tasks/DAGs.
+
+    It runs as two threads:
+     - The main thread does DB calls/checkins
+     - A subthread runs all the async code
+    """
+
+    __mapper_args__ = {'polymorphic_identity': 'TriggererJob'}
+
+    partition_ids: Optional[List[int]] = None
+    partition_total: Optional[int] = None
+
+    def __init__(self, partition=None, *args, **kwargs):
+        # Make sure we can actually run
+        if not hasattr(asyncio, "create_task"):
+            raise RuntimeError("The triggerer/deferred operators only work on 
Python 3.7 and above.")

Review comment:
       This will be handled already by the `from airflow.compat.asyncio import 
create_task` on L 27

##########
File path: airflow/models/taskinstance.py
##########
@@ -1182,6 +1228,22 @@ def _run_raw_task(
                 self._prepare_and_execute_task_with_callbacks(context, task)
             self.refresh_from_db(lock_for_update=True)
             self.state = State.SUCCESS
+        except TaskDeferred as defer:
+            # The task has signalled it wants to defer execution based on
+            # a trigger.
+            self._defer_task(defer=defer)
+            self.log.info(
+                'Pausing task as DEFERRED. ' 'dag_id=%s, task_id=%s, 
execution_date=%s, start_date=%s',

Review comment:
       ```suggestion
                   'Pausing task as DEFERRED. dag_id=%s, task_id=%s, 
execution_date=%s, start_date=%s',
   ```

##########
File path: airflow/models/trigger.py
##########
@@ -0,0 +1,195 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+from typing import Any, Dict, List, Optional
+
+from sqlalchemy import BigInteger, Column, String, func
+
+from airflow.models.base import Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.triggers.base import BaseTrigger
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.state import State
+
+
+class Trigger(Base):
+    """
+    Triggers are a workload that run in an asynchronous event loop shared with
+    other Triggers, and fire off events that will unpause deferred Tasks,
+    start linked DAGs, etc.
+
+    They are persisted into the database and then re-hydrated into a single
+    "triggerer" process, where they're all run at once. We model it so that
+    there is a many-to-one relationship between Task and Trigger, for future
+    deduplication logic to use.
+
+    Rows will be evicted from the database when the triggerer detects no
+    active Tasks/DAGs using them. Events are not stored in the database;
+    when an Event is fired, the triggerer will directly push its data to the
+    appropriate Task/DAG.
+    """
+
+    __tablename__ = "trigger"
+
+    id = Column(BigInteger, primary_key=True)
+    classpath = Column(String(1000), nullable=False)
+    kwargs = Column(ExtendedJSON, nullable=False)
+    created_date = Column(UtcDateTime, nullable=False)
+    triggerer_id = Column(BigInteger, nullable=True)
+
+    def __init__(
+        self, classpath: str, kwargs: Dict[str, Any], created_date: 
Optional[datetime.datetime] = None
+    ):
+        super().__init__()
+        self.classpath = classpath
+        self.kwargs = kwargs
+        self.created_date = created_date or timezone.utcnow()
+
+    @classmethod
+    def from_object(cls, trigger: BaseTrigger):
+        """
+        Alternative constructor that creates a trigger row based directly
+        off of a Trigger object.
+        """
+        classpath, kwargs = trigger.serialize()
+        return cls(classpath=classpath, kwargs=kwargs)
+
+    @classmethod
+    @provide_session
+    def bulk_fetch(cls, ids: List[int], session=None) -> Dict[int, "Trigger"]:
+        """
+        Fetches all of the Triggers by ID and returns a dict mapping
+        ID -> Trigger instance
+        """
+        return {obj.id: obj for obj in 
session.query(cls).filter(cls.id.in_(ids)).all()}
+
+    @classmethod
+    @provide_session
+    def clean_unused(cls, session=None):
+        """
+        Deletes all triggers that have no tasks/DAGs dependent on them
+        (triggers have a one-to-many relationship to both)
+        """
+        # Update all task instances with trigger IDs that are not DEFERRED to 
remove them
+        session.query(TaskInstance).filter(
+            TaskInstance.state != State.DEFERRED, 
TaskInstance.trigger_id.isnot(None)
+        ).update({TaskInstance.trigger_id: None})
+        # Get all triggers that have no task instances depending on them...
+        ids = [
+            trigger_id
+            for (trigger_id,) in (
+                session.query(cls.id)
+                .join(TaskInstance, cls.id == TaskInstance.trigger_id, 
isouter=True)
+                .group_by(cls.id)
+                .having(func.count(TaskInstance.trigger_id) == 0)
+            )
+        ]
+        # ...and delete them (we can't do this in one query due to MySQL)
+        
session.query(Trigger).filter(Trigger.id.in_(ids)).delete(synchronize_session=False)
+
+    @classmethod
+    @provide_session
+    def submit_event(cls, trigger_id, event, session=None):
+        """
+        Takes an event from an instance of itself, and triggers all dependent
+        tasks to resume.
+        """
+        for task_instance in session.query(TaskInstance).filter(
+            TaskInstance.trigger_id == trigger_id, TaskInstance.state == 
State.DEFERRED
+        ):
+            # Add the event's payload into the kwargs for the task
+            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs["event"] = event.payload
+            task_instance.next_kwargs = next_kwargs
+            # Remove ourselves as its trigger
+            task_instance.trigger_id = None
+            # Finally, mark it as scheduled so it gets re-queued
+            task_instance.state = State.SCHEDULED
+
+    @classmethod
+    @provide_session
+    def submit_failure(cls, trigger_id, session=None):
+        """
+        Called when a trigger has failed unexpectedly, and we need to mark
+        everything that depended on it as failed. Notably, we have to actually
+        run the failure code from a worker as it may have linked callbacks, so
+        hilariously we have to re-schedule the task instances to a worker just
+        so they can then fail.
+
+        We use a special __fail__ value for next_method to achieve this that
+        the runtime code understands as immediate-fail, and pack the error into
+        next_kwargs.
+
+        TODO: Once we have shifted callback (and email) handling to run on
+        workers as first-class concepts, we can run the failure code here
+        in-process, but we can't do that right now.
+        """
+        for task_instance in session.query(TaskInstance).filter(
+            TaskInstance.trigger_id == trigger_id, TaskInstance.state == 
State.DEFERRED
+        ):
+            # Add the error and set the next_method to the fail state
+            task_instance.next_method = "__fail__"
+            task_instance.next_kwargs = {"error": "Trigger failure"}
+            # Remove ourselves as its trigger
+            task_instance.trigger_id = None
+            # Finally, mark it as scheduled so it gets re-queued
+            task_instance.state = State.SCHEDULED
+
+    @classmethod
+    @provide_session
+    def ids_for_triggerer(cls, triggerer_id, session=None):
+        """Retrieves a list of triggerer_ids."""
+        return [row[0] for row in 
session.query(cls.id).filter(cls.triggerer_id == triggerer_id)]
+
+    @classmethod
+    @provide_session
+    def assign_unassigned(cls, triggerer_id, capacity, session=None):
+        """
+        Takes a triggerer_id and the capacity for that triggerer and assigns 
unassigned
+        triggers until that capacity is reached, or there are no more 
unassigned triggers.
+        """
+        from airflow.jobs.base_job import BaseJob  # To avoid circular import
+
+        count = session.query(cls.id).filter(cls.triggerer_id == 
triggerer_id).count()
+        capacity -= count
+
+        if capacity <= 0:
+            return
+
+        alive_triggerer_ids = [
+            row[0]
+            for row in session.query(BaseJob.id).filter(
+                BaseJob.end_date is None,
+                BaseJob.latest_heartbeat > timezone.utcnow() - 
datetime.timedelta(seconds=30),
+                BaseJob.job_type == "TriggererJob",
+            )
+        ]
+
+        # find triggers which have a triggerer_id not in list
+        trigger_ids = [
+            row[0]
+            for row in session.query(cls.id)
+            .filter(cls.triggerer_id.notin_(alive_triggerer_ids))
+            .limit(capacity)
+        ]
+
+        session.query(cls).filter(cls.id.in_(trigger_ids)).update(
+            {cls.triggerer_id: triggerer_id},
+            synchronize_session=False,
+        )

Review comment:
       By splitting this in to two queries this feels a bit racy to me.
   
   Either we should 
   - Combine in to a single SQL query
   - Use `FOR UPDATE` locking, or
   - Extend the conditional on the update so that it only updates rows where 
the trigger_id is still the "old" value (None, or not in alive_triggerer_ids).

##########
File path: docs/apache-airflow/concepts/deferring.rst
##########
@@ -0,0 +1,174 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Deferrable Operators & Triggers
+===============================
+
+Standard :doc:`Operators <operators>` and :doc:`Sensors <sensors>` take up a 
full *worker slot* for the entire time they are running, even if they are idle; 
for example, if you only have 100 worker slots available to run Tasks, and you 
have 100 DAGs waiting on a Sensor that's currently running but idle, then you 
*cannot run anything else* - even though your entire Airflow cluster is 
essentially idle. ``reschedule`` mode for Sensors solves some of this, allowing 
Sensors to only run at fixed intervals, but it is inflexible and only allows 
using time as the reason to resume, not anything else.
+
+This is where *Deferrable Operators* come in. A deferrable operator is one 
that is written with the ability to suspend itself and free up the worker when 
it knows it has to wait, and hand off the job of resuming it to something 
called a *Trigger*. As a result, while it is suspended (deferred), it is not 
taking up a worker slot and your cluster will have a lot less resources wasted 
on idle Operators or Sensors.
+
+*Triggers* are small, asynchronous pieces of Python code designed to be run 
all together in a single Python process; because they are asynchronous, they 
are able to all co-exist efficiently. As an overview of how this process works:
+
+* A task instance (running operator) gets to a point where it has to wait, and 
defers itself with a trigger tied to the event that should resume it. This 
frees up the worker to run something else.
+* The new Trigger instance is registered inside Airflow, and picked up by one 
or more *triggerer* processes
+* The trigger is run until it fires, at which point its source task is 
re-scheduled
+* The scheduler queues the task to resume on a worker node
+
+Using deferrable operators as a DAG author is almost transparent; writing 
them, however, takes a bit more work.
+
+.. note::
+
+    Deferrable Operators & Triggers rely on more recent ``asyncio`` features, 
and as a result only work
+    on Python 3.7 or higher.
+
+
+Using Deferrable Operators
+--------------------------
+
+If all you wish to do is use pre-written Deferrable Operators (such as 
``TimeSensorAsync``, which comes with Airflow), then there are only two steps 
you need:
+
+* Ensure your Airflow installation is running at least one ``triggerer`` 
process, as well as the normal ``scheduler``
+* Use deferrable operators/sensors in your DAGs
+
+That's it; everything else will be automatically handled for you. If you're 
upgrading existing DAGs, we even provide some API-compatible sensor variants 
(e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG 
with no other changes required.
+
+Note that you cannot yet use the deferral ability from inside custom 
PythonOperator/TaskFlow Python functions; it is only available to pre-built 
Operators at the moment.
+
+
+Writing Deferrable Operators
+----------------------------
+
+Writing a deferrable operator takes a bit more work. There are some main 
points to consider:
+
+* Your Operator must defer itself based on a Trigger. If there is a Trigger in 
core Airflow you can use, great; otherwise, you will have to write one.
+* Your Operator will be stopped and removed from its worker while deferred, 
and no state will persist automatically. You can persist state by asking 
Airflow to resume you at a certain method or pass certain kwargs, but that's it.
+* You can defer multiple times, and you can defer before/after your Operator 
does significant work, or only defer if certain conditions are met (e.g. a 
system does not have an immediate answer). Deferral is entirely under your 
control.
+* Any Operator can defer; no special marking on its class is needed, and it's 
not limited to Sensors.
+
+
+Triggering Deferral
+~~~~~~~~~~~~~~~~~~~
+
+If you want to trigger deferral, at any place in your Operator you can call 
``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a 
special exception that Airflow will catch. The arguments are:
+
+* ``trigger``: An instance of a Trigger that you wish to defer on. It will be 
serialized into the database.
+* ``method_name``: The method name on your Operator you want Airflow to call 
when it resumes, other than ``execute``.
+* ``kwargs``: Additional keyword arguments to pass to the method when it is 
called. Optional, defaults to ``{}``.
+* ``timeout``: A timedelta that specifies a timeout after which this deferral 
will fail, and fail the task instance. Optional, defaults to ``None``, meaning 
no timeout.
+
+When you opt to defer, your Operator will *stop executing at that point and be 
removed from its current worker*. No state will persist, and when your Operator 
is resumed it will be a *brand new instance* of it. The only way you can pass 
state from the old instance of the Operator to the new one is via 
``method_name`` and ``kwargs``.

Review comment:
       > No state will persist
   
   could be a little more explicit. How about
   
   
   > No state such as local variables or attributes set on ``self`` will persist

##########
File path: airflow/models/trigger.py
##########
@@ -0,0 +1,195 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+from typing import Any, Dict, List, Optional
+
+from sqlalchemy import BigInteger, Column, String, func
+
+from airflow.models.base import Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.triggers.base import BaseTrigger
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.state import State
+
+
+class Trigger(Base):
+    """
+    Triggers are a workload that run in an asynchronous event loop shared with
+    other Triggers, and fire off events that will unpause deferred Tasks,
+    start linked DAGs, etc.
+
+    They are persisted into the database and then re-hydrated into a single
+    "triggerer" process, where they're all run at once. We model it so that

Review comment:
       This isn't quite true anymore is it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to