This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7a11046ca1efff9292ff9ea67ad03655bf316990 Author: Tzu-ping Chung <[email protected]> AuthorDate: Mon Jun 13 04:51:07 2022 +0800 Mask secrets in stdout for 'airflow tasks test' (#24362) A stdout redirector is implemented to mask all values to stdout and redact any secrets in it with the secrets masker. This redirector is applied to the 'airflow.task' logger. Co-authored-by: Alex Kennedy <[email protected]> (cherry picked from commit 3007159c2468f8e74476cc17573e03655ab168fa) --- airflow/cli/commands/task_command.py | 10 +++++---- airflow/utils/log/secrets_masker.py | 38 ++++++++++++++++++++++--------- tests/cli/commands/test_task_command.py | 40 ++++++++++++++++++++++----------- tests/utils/log/test_secrets_masker.py | 23 ++++++++++++++++++- 4 files changed, 83 insertions(+), 28 deletions(-) diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 2b743b91fe..a789779342 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -52,6 +52,7 @@ from airflow.utils.cli import ( ) from airflow.utils.dates import timezone from airflow.utils.log.logging_mixin import StreamLogWriter +from airflow.utils.log.secrets_masker import RedactedIO from airflow.utils.net import get_hostname from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import DagRunState @@ -539,10 +540,11 @@ def task_test(args, dag=None): ti, dr_created = _get_ti(task, args.execution_date_or_run_id, args.map_index, create_if_necessary="db") try: - if args.dry_run: - ti.dry_run() - else: - ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) + with redirect_stdout(RedactedIO()): + if args.dry_run: + ti.dry_run() + else: + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) except Exception: if args.post_mortem: debugger = _guess_debugger() diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py index de038be48b..bde5141719 100644 --- a/airflow/utils/log/secrets_masker.py +++ b/airflow/utils/log/secrets_masker.py @@ -18,18 +18,17 @@ import collections import logging import re -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union +import sys +from typing import Any, Dict, Iterable, List, Optional, Set, TextIO, Tuple, TypeVar, Union from airflow import settings from airflow.compat.functools import cache, cached_property -if TYPE_CHECKING: - RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]] - +Redactable = TypeVar("Redactable", str, Dict[Any, Any], Tuple[Any, ...], List[Any]) +Redacted = Union[Redactable, str] log = logging.getLogger(__name__) - DEFAULT_SENSITIVE_FIELDS = frozenset( { 'access_token', @@ -91,14 +90,13 @@ def mask_secret(secret: Union[str, dict, Iterable], name: Optional[str] = None) _secrets_masker().add_mask(secret, name) -def redact(value: "RedactableItem", name: Optional[str] = None) -> "RedactableItem": +def redact(value: Redactable, name: Optional[str] = None) -> Redacted: """Redact any secrets found in ``value``.""" return _secrets_masker().redact(value, name) @cache def _secrets_masker() -> "SecretsMasker": - for flt in logging.getLogger('airflow.task').filters: if isinstance(flt, SecretsMasker): return flt @@ -177,7 +175,7 @@ class SecretsMasker(logging.Filter): return True - def _redact_all(self, item: "RedactableItem", depth: int) -> "RedactableItem": + def _redact_all(self, item: Redactable, depth: int) -> Redacted: if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str): return '***' if isinstance(item, dict): @@ -190,7 +188,7 @@ class SecretsMasker(logging.Filter): else: return item - def _redact(self, item: "RedactableItem", name: Optional[str], depth: int) -> "RedactableItem": + def _redact(self, item: Redactable, name: Optional[str], depth: int) -> Redacted: # Avoid spending too much effort on redacting on deeply nested # structures. This also avoid infinite recursion if a structure has # reference to self. @@ -231,7 +229,7 @@ class SecretsMasker(logging.Filter): ) return item - def redact(self, item: "RedactableItem", name: Optional[str] = None) -> "RedactableItem": + def redact(self, item: Redactable, name: Optional[str] = None) -> Redacted: """Redact an any secrets found in ``item``, if it is a string. If ``name`` is given, and it's a "sensitive" name (see @@ -258,3 +256,23 @@ class SecretsMasker(logging.Filter): elif isinstance(secret, collections.abc.Iterable): for v in secret: self.add_mask(v, name) + + +class RedactedIO(TextIO): + """IO class that redacts values going into stdout. + + Expected usage:: + + with contextlib.redirect_stdout(RedactedIO()): + ... # Writes to stdout will be redacted. + """ + + def __init__(self): + self.target = sys.stdout + + def write(self, s: str) -> int: + s = redact(s) + return self.target.write(s) + + def flush(self) -> None: + return self.target.flush() diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index deaf474200..8df5a17492 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -57,7 +57,7 @@ def reset(dag_id): # TODO: Check if tests needs side effects - locally there's missing DAG -class TestCliTasks(unittest.TestCase): +class TestCliTasks: run_id = 'TEST_RUN_ID' dag_id = 'example_python_operator' parser: ArgumentParser @@ -66,7 +66,7 @@ class TestCliTasks(unittest.TestCase): dag_run: DagRun @classmethod - def setUpClass(cls): + def setup_class(cls): cls.dagbag = DagBag(include_examples=True) cls.parser = cli_parser.get_parser() clear_db_runs() @@ -77,7 +77,7 @@ class TestCliTasks(unittest.TestCase): ) @classmethod - def tearDownClass(cls) -> None: + def teardown_class(cls) -> None: clear_db_runs() def test_cli_list_tasks(self): @@ -102,20 +102,34 @@ class TestCliTasks(unittest.TestCase): assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue() @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") - def test_test_with_existing_dag_run(self): + def test_test_with_existing_dag_run(self, caplog): """Test the `airflow test` command""" task_id = 'print_the_context' - args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, DEFAULT_DATE.isoformat()]) + with caplog.at_level("INFO", logger="airflow.task"): + task_command.task_test(args) + assert f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in caplog.text + + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") + def test_test_filters_secrets(self, capsys): + """Test ``airflow test`` does not print secrets to stdout. + + Output should be filtered by SecretsMasker. + """ + password = "somepassword1234!" + logging.getLogger("airflow.task").filters[0].add_mask(password) + args = self.parser.parse_args( + ["tasks", "test", "example_python_operator", "print_the_context", "2018-01-01"], + ) - with self.assertLogs('airflow.task', level='INFO') as cm: + with mock.patch("airflow.models.TaskInstance.run", new=lambda *_, **__: print(password)): task_command.task_test(args) - assert any( - [ - f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in log - for log in cm.output - ] - ) + assert capsys.readouterr().out.endswith("***\n") + + not_password = "!4321drowssapemos" + with mock.patch("airflow.models.TaskInstance.run", new=lambda *_, **__: print(not_password)): + task_command.task_test(args) + assert capsys.readouterr().out.endswith(f"{not_password}\n") @mock.patch("airflow.cli.commands.task_command.LocalTaskJob") def test_run_with_existing_dag_run_id(self, mock_local_job): @@ -164,7 +178,7 @@ class TestCliTasks(unittest.TestCase): task0_id, run_id, ] - with self.assertRaises(DagRunNotFound): + with pytest.raises(DagRunNotFound): task_command.task_run(self.parser.parse_args(args0), dag=dag) def test_cli_test_with_params(self): diff --git a/tests/utils/log/test_secrets_masker.py b/tests/utils/log/test_secrets_masker.py index edac9f49b7..ee0bc2eb98 100644 --- a/tests/utils/log/test_secrets_masker.py +++ b/tests/utils/log/test_secrets_masker.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import contextlib import inspect import logging import logging.config @@ -23,7 +24,7 @@ import textwrap import pytest from airflow import settings -from airflow.utils.log.secrets_masker import SecretsMasker, should_hide_value_for_key +from airflow.utils.log.secrets_masker import RedactedIO, SecretsMasker, should_hide_value_for_key from tests.test_utils.config import conf_vars settings.MASK_SECRETS_IN_LOGS = True @@ -341,3 +342,23 @@ class ShortExcFormatter(logging.Formatter): def lineno(): """Returns the current line number in our program.""" return inspect.currentframe().f_back.f_lineno + + +class TestRedactedIO: + def test_redacts_from_print(self, capsys): + # Without redacting, password is printed. + print(p) + stdout = capsys.readouterr().out + assert stdout == f"{p}\n" + assert "***" not in stdout + + # With context manager, password is redacted. + with contextlib.redirect_stdout(RedactedIO()): + print(p) + stdout = capsys.readouterr().out + assert stdout == "***\n" + + def test_write(self, capsys): + RedactedIO().write(p) + stdout = capsys.readouterr().out + assert stdout == "***"
