This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch openlineage_dont_run_tis_executor
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 975e1a61afa86a3c5edf56b2e5b7024dba160cfd
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Aug 13 18:10:09 2023 +0200

    openlineage: don't run task instance listener in executor
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/plugins/listener.py | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index d85a559f56..4a6b75f677 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -17,7 +17,7 @@
 from __future__ import annotations
 
 import logging
-from concurrent.futures import Executor, ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor
 from datetime import datetime
 from typing import TYPE_CHECKING
 
@@ -42,8 +42,8 @@ class OpenLineageListener:
     """OpenLineage listener sends events on task instance and dag run starts, 
completes and failures."""
 
     def __init__(self):
+        self._executor = None
         self.log = logging.getLogger(__name__)
-        self.executor: Executor = None  # type: ignore
         self.extractor_manager = ExtractorManager()
         self.adapter = OpenLineageAdapter()
 
@@ -102,7 +102,7 @@ class OpenLineageListener:
                 },
             )
 
-        self.executor.submit(on_running)
+        on_running()
 
     @hookimpl
     def on_task_instance_success(self, previous_state, task_instance: 
TaskInstance, session):
@@ -130,7 +130,7 @@ class OpenLineageListener:
                 task=task_metadata,
             )
 
-        self.executor.submit(on_success)
+        on_success()
 
     @hookimpl
     def on_task_instance_failed(self, previous_state, task_instance: 
TaskInstance, session):
@@ -158,12 +158,17 @@ class OpenLineageListener:
                 task=task_metadata,
             )
 
-        self.executor.submit(on_failure)
+        on_failure()
+
+    @property
+    def executor(self):
+        if not self._executor:
+            self._executor = ThreadPoolExecutor(max_workers=8, 
thread_name_prefix="openlineage_")
+        return self._executor
 
     @hookimpl
     def on_starting(self, component):
         self.log.debug("on_starting: %s", component.__class__.__name__)
-        self.executor = ThreadPoolExecutor(max_workers=8, 
thread_name_prefix="openlineage_")
 
     @hookimpl
     def before_stopping(self, component):
@@ -174,9 +179,6 @@ class OpenLineageListener:
 
     @hookimpl
     def on_dag_run_running(self, dag_run: DagRun, msg: str):
-        if not self.executor:
-            self.log.error("Executor have not started before 
`on_dag_run_running`")
-            return
         data_interval_start = dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
         data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
         self.executor.submit(

Reply via email to