ashb commented on a change in pull request #20443:
URL: https://github.com/apache/airflow/pull/20443#discussion_r778038070



##########
File path: airflow/__init__.py
##########
@@ -80,6 +80,11 @@ def __getattr__(name):
     manager.initialize_providers_hooks()
     manager.initialize_providers_extra_links()
 
+if settings.EXECUTE_LISTENERS_ON_SCHEDULER:
+    from airflow.plugins_manager import integrate_listener_plugins
+
+    integrate_listener_plugins()

Review comment:
       Given the name of this setting I wonder if this code should be somewhere 
in SchedulerJob instead?

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -71,6 +74,8 @@ def __init__(
         # terminate multiple times
         self.terminating = False
 
+        self._enable_task_listeners()

Review comment:
       I think should should be moved in to `_execute` instead of the 
constructor -- it (currently) doesn't make much difference, to anything other 
than tests, but currently the constructor otherwise doesn't do anything than 
create an object, so lets keep it that way.

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       I'm not sure about the need for the class here. Is there a reason we 
don't follow the "pluggy" way here and do:
   
   ```suggestion
           listeners = [on_task_instance_running]
   ```

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -167,9 +172,14 @@ definitions in Airflow.
 
     # Importing base classes that we need to derive
     from airflow.hooks.base import BaseHook
+    from airflow.listeners.listener import Listener
     from airflow.models.baseoperator import BaseOperatorLink
     from airflow.providers.amazon.aws.transfers.gcs_to_s3 import 
GCSToS3Operator
 
+    from pluggy import HookimplMarker
+
+    hookimpl = HookimplMarker("airflow")

Review comment:
       Looking at the pluggy docs it appears the normal pattern here is to 
import the hookimpl from a module rather than to create it here: 
https://pluggy.readthedocs.io/en/latest/#the-plugin

##########
File path: docs/apache-airflow/plugins.rst
##########
@@ -268,6 +285,7 @@ definitions in Airflow.
         operator_extra_links = [
             S3LogLink(),
         ]
+        listeners = [PrintingRunningListener]

Review comment:
       The disadvantage of using a class is it might imply that you can store 
state in the instance between plugin invocations (which isn't the case, not in 
general terms anyway) where as a function might make that clearer.

##########
File path: airflow/jobs/local_task_job.py
##########
@@ -291,3 +296,13 @@ def _update_dagrun_state_for_paused_dag(self, 
session=None):
             if dag_run:
                 dag_run.dag = dag
                 dag_run.update_state(session=session, execute_callbacks=True)
+
+    @staticmethod
+    def _enable_task_listeners():
+        """
+        Check if we have any registered listeners, then register sqlalchemy 
hooks for
+        TI state change if we do.
+        """
+        integrate_listener_plugins()
+        if get_listener_manager().has_listeners():

Review comment:
       `integrate_listener_plugins` should be called automatically from inside 
`get_listener_manager` I think.

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec
+
+log = logging.getLogger(__name__)
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manages registration of listeners and provides hook property 
for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names: Set[str] = set()
+
+    def has_listeners(self) -> bool:

Review comment:
       Maybe make this a property?
   
   ```suggestion
       @property
       def has_listeners(self) -> bool:
   ```

##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+_is_listening = False
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()
+    global _is_listening
+
+    def on_task_instance_state_session_flush(session, flush_context):
+        """
+        Listens for session.flush() events that modify TaskInstance's state, 
and notify listeners that listen
+        for that event. Doing it this way enable us to be stateless in the 
SQLAlchemy event listener.
+        """
+        if not get_listener_manager().has_listeners():
+            return
+        for state in flush_context.states:
+            if isinstance(state.object, TaskInstance) and session.is_modified(
+                state.object, include_collections=False
+            ):
+                added, unchanged, deleted = 
flush_context.get_attribute_history(state, 'state')
+
+                logger.warning(
+                    "session flush listener: added %s unchanged %s deleted %s 
- %s",
+                    added,
+                    unchanged,
+                    deleted,
+                    state.object,
+                )

Review comment:
       Move this to debug level now?

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec
+
+log = logging.getLogger(__name__)
+
+
+class Listener:
+    """Class used as a namespace for listener hook implementation namespace"""
+
+
+_listener_manager = None
+
+
+class ListenerManager:
+    """Class that manages registration of listeners and provides hook property 
for calling them"""
+
+    def __init__(self):
+        self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookspecs(spec)
+        self.listener_names: Set[str] = set()
+
+    def has_listeners(self) -> bool:
+        return len(self.pm.get_plugins()) > 0
+
+    @property
+    def hook(self) -> "_HookRelay":
+        """Returns hook, on which plugin methods specified in spec can be 
called."""
+        return self.pm.hook
+
+    def add_listener(self, listener: Listener):
+        if listener.__class__.__name__ in self.listener_names:
+            return
+        if self.pm.is_registered(listener):
+            return
+
+        listener_type = type(listener)
+        if not (
+            inspect.isclass(listener_type)
+            and issubclass(listener_type, Listener)
+            and (listener_type is not Listener)
+        ):
+            log.warning("Can't register listener: %s - is not a Listener 
subclass", listener_type)
+            return

Review comment:
       We shouldn't need to do any of this -- one of the main features of 
Pluggy is that it handles signature validation for us!
   
   https://pluggy.readthedocs.io/en/latest/#enforcing

##########
File path: airflow/listeners/listener.py
##########
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import inspect
+import logging
+from typing import TYPE_CHECKING, Set
+
+import pluggy
+
+if TYPE_CHECKING:
+    from pluggy._hooks import _HookRelay
+
+from airflow.listeners import spec

Review comment:
       Delay this import until ListenerManager init?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3434,6 +3435,12 @@ def test_timeout_triggers(self, dag_maker):
         assert ti1.next_method == "__fail__"
         assert ti2.state == State.DEFERRED
 
+    @pytest.fixture(autouse=True, scope='function')
+    def clean_listener(self):
+        get_listener_manager().clear()
+        yield
+        get_listener_manager().clear()

Review comment:
       Why is this needed in SchedulerJob tests?

##########
File path: airflow/listeners/spec.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import TYPE_CHECKING, Optional
+
+from pluggy import HookspecMarker
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
+    from airflow.models import TaskInstance
+    from airflow.utils.state import State
+
+hookspec = HookspecMarker("airflow")
+
+
+@hookspec
+def on_task_instance_running(
+    previous_state: "State", task_instance: "TaskInstance", session: 
Optional["Session"]

Review comment:
       ```suggestion
       previous_state: "TaskInstanceState", task_instance: "TaskInstance", 
session: Optional["Session"]
   ```
   
   in a few places.

##########
File path: tests/listeners/test_listeners.py
##########
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pluggy
+import pytest as pytest
+
+from airflow import AirflowException
+from airflow.listeners.events import register_task_instance_state_events
+from airflow.listeners.listener import Listener, get_listener_manager
+from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+DAG_ID = "test_listener_dag"
+TASK_ID = "test_listener_task"
+EXECUTION_DATE = timezone.utcnow()
+
+
+hookimpl = pluggy.HookimplMarker("airflow")
+
+
+class CollectingListener(Listener):
+    def __init__(self):
+        self.states = []
+
+
+class PartialListener(CollectingListener):
+    @hookimpl
+    def on_task_instance_running(self, previous_state, task_instance, session):
+        self.states.append(State.RUNNING)
+
+
+class FullListener(PartialListener):
+    @hookimpl
+    def on_task_instance_success(self, previous_state, task_instance, session):
+        self.states.append(State.SUCCESS)
+
+    @hookimpl
+    def on_task_instance_failed(self, previous_state, task_instance, session):
+        self.states.append(State.FAILED)
+
+
+class ThrowingListener(Listener):
+    @hookimpl
+    def on_task_instance_running(self, previous_state, task_instance, session):
+        raise RuntimeError()
+
+
[email protected](scope="module", autouse=True)
+def register_events():
+    register_task_instance_state_events()
+
+
[email protected](autouse=True)
+def clean_listener_manager():
+    lm = get_listener_manager()
+    lm.clear()
+    yield
+    lm = get_listener_manager()
+    lm.clear()
+
+
+@provide_session
+def test_listener_gets_calls(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = FullListener()
+    lm.add_listener(listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(listener.states) == 2
+    assert listener.states == [State.RUNNING, State.SUCCESS]
+
+
+@provide_session
+def test_listener_gets_only_subscribed_calls(create_task_instance, 
session=None):
+    lm = get_listener_manager()
+    listener = PartialListener()
+    lm.add_listener(listener)
+
+    ti = create_task_instance(session=session, state=State.QUEUED)
+    ti.run()
+
+    assert len(listener.states) == 1
+    assert listener.states == [State.RUNNING]
+
+
+@provide_session
+def test_listener_throws_exceptions(create_task_instance, session=None):
+    lm = get_listener_manager()
+    listener = ThrowingListener()
+    lm.add_listener(listener)
+
+    with pytest.raises(RuntimeError):
+        ti = create_task_instance(session=session, state=State.QUEUED)
+        ti.run()
+
+
+def test_listener_needs_to_subclass_listener():
+    lm = get_listener_manager()
+
+    class Dummy:
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, 
session):
+            pass
+
+    lm.add_listener(Dummy())
+    assert not lm.has_listeners()
+
+
+@provide_session
+def 
test_listener_captures_failed_taskinstances(create_task_instance_of_operator, 
session=None):
+    lm = get_listener_manager()
+    listener = FullListener()
+    lm.add_listener(listener)
+
+    with pytest.raises(AirflowException):
+        ti = create_task_instance_of_operator(
+            BashOperator, dag_id=DAG_ID, execution_date=EXECUTION_DATE, 
task_id=TASK_ID, bash_command="exit 1"
+        )
+        ti.run()

Review comment:
       Hmmm, if we can avoid calling `ti.run()` here in this test that would be 
good. (It does _a lot_ and is relatively heavy weight approach, so if we can do 
something lighter here that would be preferable)

##########
File path: airflow/listeners/events.py
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from sqlalchemy import event
+from sqlalchemy.orm import Session
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.models import TaskInstance
+from airflow.utils.state import State
+
+_is_listening = False
+
+
+def register_task_instance_state_events():
+    logger = logging.getLogger()

Review comment:
       ```suggestion
       logger = logging.getLogger(__name__)
   ```

##########
File path: tests/plugins/test_plugins_manager.py
##########
@@ -350,6 +357,21 @@ class MacroPlugin(AirflowPlugin):
             # rendering templates.
             assert hasattr(macros, MacroPlugin.name)
 
+    def test_registering_plugin_listeners(self):
+        from airflow import plugins_manager
+
+        with mock.patch('airflow.plugins_manager.plugins', []):
+            plugins_manager.load_plugins_from_plugin_directory()
+            plugins_manager.integrate_listener_plugins()
+
+            assert get_listener_manager().has_listeners()
+            assert 
get_listener_manager().pm.get_plugins().pop().__class__.__name__ == 
"PluginListener"
+
+    @pytest.fixture(autouse=True)
+    def clear_listeners(self):
+        yield
+        get_listener_manager().clear()

Review comment:
       ```suggestion
   ```
   
   Already have this as a module level fixture.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to