This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3761024cb1d Ensure a `WatchedSubprocess` redacts only its values and
not parent processes' (#48527)
3761024cb1d is described below
commit 3761024cb1d45ca6a0f788ecb828cfd305c5cffc
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Mar 29 18:31:01 2025 +0530
Ensure a `WatchedSubprocess` redacts only its values and not parent
processes' (#48527)
---
airflow-core/src/airflow/dag_processing/manager.py | 4 ++++
.../src/airflow/jobs/triggerer_job_runner.py | 4 ++++
.../src/airflow/sdk/execution_time/secrets_masker.py | 18 ++++++++++++++++++
.../src/airflow/sdk/execution_time/supervisor.py | 12 +++---------
task-sdk/src/airflow/sdk/log.py | 10 +++++-----
.../task_sdk/definitions/test_secrets_masker.py | 20 ++++++++++++++++++++
6 files changed, 54 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 1dee1ce61dd..6b84eb442d1 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -233,6 +233,10 @@ class DagFileProcessorManager(LoggingMixin):
By processing them in separate processes, we can get parallelism and
isolation
from potentially harmful user code.
"""
+ from airflow.sdk.execution_time.secrets_masker import
reset_secrets_masker
+
+ reset_secrets_masker()
+
self.register_exit_signals()
self.log.info("Processing files using up to %s processes at a time ",
self._parallelism)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index bcf0bb9bb96..30c7a64ed44 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -316,6 +316,10 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
def run(self) -> None:
"""Run synchronously and handle all database reads/writes."""
+ from airflow.sdk.execution_time.secrets_masker import
reset_secrets_masker
+
+ reset_secrets_masker()
+
while not self.stop:
if not self.is_alive():
log.error("Trigger runner process has died! Exiting.")
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
index f5b30b3362b..7089291ed91 100644
--- a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
@@ -130,6 +130,19 @@ def _secrets_masker() -> SecretsMasker:
)
+def reset_secrets_masker() -> None:
+ """
+ Reset the secrets masker to clear existing patterns and replacer.
+
+ This utility ensures that an execution environment starts with a fresh
masker,
+ preventing any carry over of patterns or replacer from previous execution
or parent processes.
+
+ New processor types should invoke this method when setting up their own
masking to avoid
+ inheriting masking rules from existing execution environments.
+ """
+ _secrets_masker().reset_masker()
+
+
@cache
def _get_v1_env_var_type() -> type:
try:
@@ -351,6 +364,11 @@ class SecretsMasker(logging.Filter):
for v in secret:
self.add_mask(v, name)
+ def reset_masker(self):
+ """Reset the patterns and the replacer in the masker instance."""
+ self.patterns = set()
+ self.replacer = None
+
class RedactedIO(TextIO):
"""
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index f8272e39c75..133e97ce9a5 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1078,14 +1078,6 @@ def ensure_secrets_backend_loaded() ->
list[BaseSecretsBackend]:
return
ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
-def register_secrets_masker():
- """Register the secrets masker to mask task logs."""
- from airflow.sdk.execution_time.secrets_masker import
get_sensitive_variables_fields, mask_secret
-
- for field in get_sensitive_variables_fields():
- mask_secret(field)
-
-
def supervise(
*,
ti: TaskInstance,
@@ -1113,6 +1105,8 @@ def supervise(
:return: Exit code of the process.
"""
# One or the other
+ from airflow.sdk.execution_time.secrets_masker import reset_secrets_masker
+
if not client and ((not server) ^ dry_run):
raise ValueError(f"Can only specify one of {server=} or {dry_run=}")
@@ -1145,7 +1139,7 @@ def supervise(
ensure_secrets_backend_loaded()
- register_secrets_masker()
+ reset_secrets_masker()
process = ActivitySubprocess.start(
dag_rel_path=dag_rel_path,
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 75e8a28b816..bfaf5002468 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -133,9 +133,7 @@ class StdBinaryStreamHandler(logging.StreamHandler):
@cache
-def logging_processors(
- enable_pretty_log: bool,
-):
+def logging_processors(enable_pretty_log: bool, mask_secrets: bool = True):
if enable_pretty_log:
timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d
%H:%M:%S.%f")
else:
@@ -148,10 +146,12 @@ def logging_processors(
structlog.stdlib.PositionalArgumentsFormatter(),
logger_name,
redact_jwt,
- mask_logs,
structlog.processors.StackInfoRenderer(),
]
+ if mask_secrets:
+ processors.append(mask_logs)
+
# Imports to suppress showing code from these modules. We need the import
to get the filepath for
# structlog to ignore.
import contextlib
@@ -255,7 +255,7 @@ def configure_logging(
formatter = "colored"
else:
formatter = "plain"
- processors, named = logging_processors(enable_pretty_log)
+ processors, named = logging_processors(enable_pretty_log, mask_secrets=not
sending_to_supervisor)
timestamper = named["timestamper"]
pre_chain: list[structlog.typing.Processor] = [
diff --git a/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
b/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
index 4872614daea..4735ddc048c 100644
--- a/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
+++ b/task-sdk/tests/task_sdk/definitions/test_secrets_masker.py
@@ -35,6 +35,7 @@ from airflow.sdk.execution_time.secrets_masker import (
SecretsMasker,
mask_secret,
redact,
+ reset_secrets_masker,
should_hide_value_for_key,
)
from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
@@ -357,6 +358,25 @@ class TestSecretsMasker:
logger.info(conn.get_uri())
assert "should_be_hidden" not in caplog.text
+ def test_reset_secrets_masker(
+ self,
+ ):
+ secrets_masker = SecretsMasker()
+ secrets_masker.add_mask("mask_this")
+ secrets_masker.add_mask("and_this")
+ secrets_masker.add_mask("maybe_this_too")
+
+ val = ["mask_this", "and_this", "maybe_this_too"]
+
+ with
patch("airflow.sdk.execution_time.secrets_masker._secrets_masker",
return_value=secrets_masker):
+ got = redact(val)
+ assert got == ["***"] * 3
+
+ reset_secrets_masker()
+
+ got = redact(val)
+ assert got == val
+
class TestShouldHideValueForKey:
@pytest.mark.parametrize(