This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d529ec8d45 Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. 
(#39235)
d529ec8d45 is described below

commit d529ec8d4572b4b9e97344974b2aa960c8a90ae6
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Wed May 15 10:24:00 2024 +0200

    Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. (#39235)
    
    Make `max_workers` configurable.
    
    Signed-off-by: Jakub Dardzinski <[email protected]>
---
 airflow/providers/openlineage/conf.py               |  7 +++++++
 airflow/providers/openlineage/plugins/listener.py   |  5 +++--
 .../providers/openlineage/plugins/test_listener.py  | 21 +++++++++++++++++++++
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/openlineage/conf.py 
b/airflow/providers/openlineage/conf.py
index d43806abca..23e663f67e 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -104,3 +104,10 @@ def is_disabled() -> bool:
     # Check if both 'transport' and 'config_path' are not present and also
     # if legacy 'OPENLINEAGE_URL' environment variables is not set
     return transport() == {} and config_path(True) == "" and 
os.getenv("OPENLINEAGE_URL", "") == ""
+
+
+@cache
+def dag_state_change_process_pool_size() -> int:
+    """[openlineage] dag_state_change_process_pool_size."""
+    option = conf.getint(_CONFIG_SECTION, 
"dag_state_change_process_pool_size", fallback=1)
+    return option
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 9067d53f69..73f8c8c79e 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -17,7 +17,7 @@
 from __future__ import annotations
 
 import logging
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ProcessPoolExecutor
 from datetime import datetime
 from typing import TYPE_CHECKING
 
@@ -25,6 +25,7 @@ from openlineage.client.serde import Serde
 
 from airflow import __version__ as airflow_version
 from airflow.listeners import hookimpl
+from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.extractors import ExtractorManager
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, 
RunState
 from airflow.providers.openlineage.utils.utils import (
@@ -281,7 +282,7 @@ class OpenLineageListener:
     @property
     def executor(self):
         if not self._executor:
-            self._executor = ThreadPoolExecutor(max_workers=8, 
thread_name_prefix="openlineage_")
+            self._executor = 
ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size())
         return self._executor
 
     @hookimpl
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index fa651de1b2..d9fbb0dfd3 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -526,6 +526,27 @@ def 
test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope
     listener.adapter.complete_task.assert_not_called()
 
 
[email protected](
+    "max_workers,expected",
+    [
+        (None, 1),
+        ("8", 8),
+    ],
+)
[email protected]("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor",
 autospec=True)
+def 
test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor,
 max_workers, expected):
+    """mock ProcessPoolExecutor and check if 
conf.dag_state_change_process_pool_size is applied to max_workers"""
+    listener = OpenLineageListener()
+    # mock ProcessPoolExecutor class
+    try:
+        with conf_vars({("openlineage", "dag_state_change_process_pool_size"): 
max_workers}):
+            listener.on_dag_run_running(mock.MagicMock(), None)
+        mock_executor.assert_called_once_with(max_workers=expected)
+        mock_executor.return_value.submit.assert_called_once()
+    finally:
+        conf.dag_state_change_process_pool_size.cache_clear()
+
+
 class TestOpenLineageSelectiveEnable:
     def setup_method(self):
         self.dag = DAG(

Reply via email to