This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage_dont_run_tis_executor in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 975e1a61afa86a3c5edf56b2e5b7024dba160cfd Author: Maciej Obuchowski <[email protected]> AuthorDate: Sun Aug 13 18:10:09 2023 +0200 openlineage: don't run task instance listener in executor Signed-off-by: Maciej Obuchowski <[email protected]> --- airflow/providers/openlineage/plugins/listener.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index d85a559f56..4a6b75f677 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,7 +17,7 @@ from __future__ import annotations import logging -from concurrent.futures import Executor, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import TYPE_CHECKING @@ -42,8 +42,8 @@ class OpenLineageListener: """OpenLineage listener sends events on task instance and dag run starts, completes and failures.""" def __init__(self): + self._executor = None self.log = logging.getLogger(__name__) - self.executor: Executor = None # type: ignore self.extractor_manager = ExtractorManager() self.adapter = OpenLineageAdapter() @@ -102,7 +102,7 @@ class OpenLineageListener: }, ) - self.executor.submit(on_running) + on_running() @hookimpl def on_task_instance_success(self, previous_state, task_instance: TaskInstance, session): @@ -130,7 +130,7 @@ class OpenLineageListener: task=task_metadata, ) - self.executor.submit(on_success) + on_success() @hookimpl def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, session): @@ -158,12 +158,17 @@ class OpenLineageListener: task=task_metadata, ) - self.executor.submit(on_failure) + on_failure() + + @property + def executor(self): + if not self._executor: + self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") + return self._executor @hookimpl def on_starting(self, component): self.log.debug("on_starting: %s", component.__class__.__name__) - self.executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") @hookimpl def before_stopping(self, component): @@ -174,9 +179,6 @@ class OpenLineageListener: @hookimpl def on_dag_run_running(self, dag_run: DagRun, msg: str): - if not self.executor: - self.log.error("Executor have not started before `on_dag_run_running`") - return data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None self.executor.submit(
