This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 325444e381 Allow more parameters to be piped through via
execute_in_subprocess (#23286)
325444e381 is described below
commit 325444e3813bc2439566f6f29c07ff3cfc038368
Author: Charles Machalow <[email protected]>
AuthorDate: Tue May 17 08:47:19 2022 -0700
Allow more parameters to be piped through via execute_in_subprocess (#23286)
---
airflow/utils/process_utils.py | 14 ++++++++++++--
tests/utils/test_process_utils.py | 10 +++++++++-
2 files changed, 21 insertions(+), 3 deletions(-)
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 96ca4b3480..ca8fc2433e 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -158,13 +158,23 @@ def reap_process_group(
def execute_in_subprocess(cmd: List[str], cwd: Optional[str] = None) -> None:
"""
Execute a process and stream output to logger
-
:param cmd: command and arguments to run
:param cwd: Current working directory passed to the Popen constructor
"""
+ execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
+
+
+def execute_in_subprocess_with_kwargs(cmd: List[str], **kwargs) -> None:
+ """
+ Execute a process and stream output to logger
+
+ :param cmd: command and arguments to run
+
+ All other keyword args will be passed directly to subprocess.Popen
+ """
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
with subprocess.Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0,
close_fds=True, cwd=cwd
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0,
close_fds=True, **kwargs
) as proc:
log.info("Output:")
if proc.stdout:
diff --git a/tests/utils/test_process_utils.py
b/tests/utils/test_process_utils.py
index 65f49a54a4..ab76a05172 100644
--- a/tests/utils/test_process_utils.py
+++ b/tests/utils/test_process_utils.py
@@ -35,7 +35,11 @@ import pytest
from airflow.exceptions import AirflowException
from airflow.utils import process_utils
-from airflow.utils.process_utils import check_if_pidfile_process_is_running,
execute_in_subprocess
+from airflow.utils.process_utils import (
+ check_if_pidfile_process_is_running,
+ execute_in_subprocess,
+ execute_in_subprocess_with_kwargs,
+)
class TestReapProcessGroup(unittest.TestCase):
@@ -117,6 +121,10 @@ class TestExecuteInSubProcess:
with pytest.raises(CalledProcessError):
process_utils.execute_in_subprocess(["bash", "-c", "exit 1"])
+ def test_using_env_as_kwarg_works(self, caplog):
+ execute_in_subprocess_with_kwargs(["bash", "-c", 'echo "My value is
${VALUE}"'], env=dict(VALUE="1"))
+ assert "My value is 1" in caplog.text
+
def my_sleep_subprocess():
sleep(100)