This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d529ec8d45 Use `ProcessPoolExecutor` over `ThreadPoolExecutor`.
(#39235)
d529ec8d45 is described below
commit d529ec8d4572b4b9e97344974b2aa960c8a90ae6
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Wed May 15 10:24:00 2024 +0200
Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. (#39235)
Make `max_workers` configurable.
Signed-off-by: Jakub Dardzinski <[email protected]>
---
airflow/providers/openlineage/conf.py | 7 +++++++
airflow/providers/openlineage/plugins/listener.py | 5 +++--
.../providers/openlineage/plugins/test_listener.py | 21 +++++++++++++++++++++
3 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/openlineage/conf.py
b/airflow/providers/openlineage/conf.py
index d43806abca..23e663f67e 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -104,3 +104,10 @@ def is_disabled() -> bool:
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and
os.getenv("OPENLINEAGE_URL", "") == ""
+
+
+@cache
+def dag_state_change_process_pool_size() -> int:
+ """[openlineage] dag_state_change_process_pool_size."""
+ option = conf.getint(_CONFIG_SECTION,
"dag_state_change_process_pool_size", fallback=1)
+ return option
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 9067d53f69..73f8c8c79e 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -17,7 +17,7 @@
from __future__ import annotations
import logging
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import TYPE_CHECKING
@@ -25,6 +25,7 @@ from openlineage.client.serde import Serde
from airflow import __version__ as airflow_version
from airflow.listeners import hookimpl
+from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter,
RunState
from airflow.providers.openlineage.utils.utils import (
@@ -281,7 +282,7 @@ class OpenLineageListener:
@property
def executor(self):
if not self._executor:
- self._executor = ThreadPoolExecutor(max_workers=8,
thread_name_prefix="openlineage_")
+ self._executor =
ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size())
return self._executor
@hookimpl
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index fa651de1b2..d9fbb0dfd3 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -526,6 +526,27 @@ def
test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope
listener.adapter.complete_task.assert_not_called()
[email protected](
+ "max_workers,expected",
+ [
+ (None, 1),
+ ("8", 8),
+ ],
+)
[email protected]("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor",
autospec=True)
+def
test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor,
max_workers, expected):
+ """mock ProcessPoolExecutor and check if
conf.dag_state_change_process_pool_size is applied to max_workers"""
+ listener = OpenLineageListener()
+ # mock ProcessPoolExecutor class
+ try:
+ with conf_vars({("openlineage", "dag_state_change_process_pool_size"):
max_workers}):
+ listener.on_dag_run_running(mock.MagicMock(), None)
+ mock_executor.assert_called_once_with(max_workers=expected)
+ mock_executor.return_value.submit.assert_called_once()
+ finally:
+ conf.dag_state_change_process_pool_size.cache_clear()
+
+
class TestOpenLineageSelectiveEnable:
def setup_method(self):
self.dag = DAG(