This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3761024cb1d Ensure a `WatchedSubprocess` redacts only its values and 
not parent processes' (#48527)
3761024cb1d is described below

commit 3761024cb1d45ca6a0f788ecb828cfd305c5cffc
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Mar 29 18:31:01 2025 +0530

    Ensure a `WatchedSubprocess` redacts only its values and not parent 
processes' (#48527)
---
 airflow-core/src/airflow/dag_processing/manager.py   |  4 ++++
 .../src/airflow/jobs/triggerer_job_runner.py         |  4 ++++
 .../src/airflow/sdk/execution_time/secrets_masker.py | 18 ++++++++++++++++++
 .../src/airflow/sdk/execution_time/supervisor.py     | 12 +++---------
 task-sdk/src/airflow/sdk/log.py                      | 10 +++++-----
 .../task_sdk/definitions/test_secrets_masker.py      | 20 ++++++++++++++++++++
 6 files changed, 54 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 1dee1ce61dd..6b84eb442d1 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -233,6 +233,10 @@ class DagFileProcessorManager(LoggingMixin):
         By processing them in separate processes, we can get parallelism and 
isolation
         from potentially harmful user code.
         """
+        from airflow.sdk.execution_time.secrets_masker import 
reset_secrets_masker
+
+        reset_secrets_masker()
+
         self.register_exit_signals()
 
         self.log.info("Processing files using up to %s processes at a time ", 
self._parallelism)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index bcf0bb9bb96..30c7a64ed44 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -316,6 +316,10 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
 
     def run(self) -> None:
         """Run synchronously and handle all database reads/writes."""
+        from airflow.sdk.execution_time.secrets_masker import 
reset_secrets_masker
+
+        reset_secrets_masker()
+
         while not self.stop:
             if not self.is_alive():
                 log.error("Trigger runner process has died! Exiting.")
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py 
b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
index f5b30b3362b..7089291ed91 100644
--- a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
@@ -130,6 +130,19 @@ def _secrets_masker() -> SecretsMasker:
     )
 
 
+def reset_secrets_masker() -> None:
+    """
+    Reset the secrets masker to clear existing patterns and replacer.
+
+    This utility ensures that an execution environment starts with a fresh 
masker,
+    preventing any carry over of patterns or replacer from previous execution 
or parent processes.
+
+    New processor types should invoke this method when setting up their own 
masking to avoid
+    inheriting masking rules from existing execution environments.
+    """
+    _secrets_masker().reset_masker()
+
+
 @cache
 def _get_v1_env_var_type() -> type:
     try:
@@ -351,6 +364,11 @@ class SecretsMasker(logging.Filter):
             for v in secret:
                 self.add_mask(v, name)
 
+    def reset_masker(self):
+        """Reset the patterns and the replacer in the masker instance."""
+        self.patterns = set()
+        self.replacer = None
+
 
 class RedactedIO(TextIO):
     """
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index f8272e39c75..133e97ce9a5 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1078,14 +1078,6 @@ def ensure_secrets_backend_loaded() -> 
list[BaseSecretsBackend]:
     return 
ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
 
 
-def register_secrets_masker():
-    """Register the secrets masker to mask task logs."""
-    from airflow.sdk.execution_time.secrets_masker import 
get_sensitive_variables_fields, mask_secret
-
-    for field in get_sensitive_variables_fields():
-        mask_secret(field)
-
-
 def supervise(
     *,
     ti: TaskInstance,
@@ -1113,6 +1105,8 @@ def supervise(
     :return: Exit code of the process.
     """
     # One or the other
+    from airflow.sdk.execution_time.secrets_masker import reset_secrets_masker
+
     if not client and ((not server) ^ dry_run):
         raise ValueError(f"Can only specify one of {server=} or {dry_run=}")
 
@@ -1145,7 +1139,7 @@ def supervise(
 
     ensure_secrets_backend_loaded()
 
-    register_secrets_masker()
+    reset_secrets_masker()
 
     process = ActivitySubprocess.start(
         dag_rel_path=dag_rel_path,
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 75e8a28b816..bfaf5002468 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -133,9 +133,7 @@ class StdBinaryStreamHandler(logging.StreamHandler):
 
 
 @cache
-def logging_processors(
-    enable_pretty_log: bool,
-):
+def logging_processors(enable_pretty_log: bool, mask_secrets: bool = True):
     if enable_pretty_log:
         timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d 
%H:%M:%S.%f")
     else:
@@ -148,10 +146,12 @@ def logging_processors(
         structlog.stdlib.PositionalArgumentsFormatter(),
         logger_name,
         redact_jwt,
-        mask_logs,
         structlog.processors.StackInfoRenderer(),
     ]
 
+    if mask_secrets:
+        processors.append(mask_logs)
+
     # Imports to suppress showing code from these modules. We need the import 
to get the filepath for
     # structlog to ignore.
     import contextlib
@@ -255,7 +255,7 @@ def configure_logging(
         formatter = "colored"
     else:
         formatter = "plain"
-    processors, named = logging_processors(enable_pretty_log)
+    processors, named = logging_processors(enable_pretty_log, mask_secrets=not 
sending_to_supervisor)
     timestamper = named["timestamper"]
 
     pre_chain: list[structlog.typing.Processor] = [
diff --git a/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py 
b/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
index 4872614daea..4735ddc048c 100644
--- a/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
+++ b/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
@@ -35,6 +35,7 @@ from airflow.sdk.execution_time.secrets_masker import (
     SecretsMasker,
     mask_secret,
     redact,
+    reset_secrets_masker,
     should_hide_value_for_key,
 )
 from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
@@ -357,6 +358,25 @@ class TestSecretsMasker:
             logger.info(conn.get_uri())
             assert "should_be_hidden" not in caplog.text
 
+    def test_reset_secrets_masker(
+        self,
+    ):
+        secrets_masker = SecretsMasker()
+        secrets_masker.add_mask("mask_this")
+        secrets_masker.add_mask("and_this")
+        secrets_masker.add_mask("maybe_this_too")
+
+        val = ["mask_this", "and_this", "maybe_this_too"]
+
+        with 
patch("airflow.sdk.execution_time.secrets_masker._secrets_masker", 
return_value=secrets_masker):
+            got = redact(val)
+            assert got == ["***"] * 3
+
+            reset_secrets_masker()
+
+            got = redact(val)
+            assert got == val
+
 
 class TestShouldHideValueForKey:
     @pytest.mark.parametrize(

Reply via email to