This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7a11046ca1efff9292ff9ea67ad03655bf316990
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Jun 13 04:51:07 2022 +0800

    Mask secrets in stdout for 'airflow tasks test' (#24362)
    
    A stdout redirector is implemented to mask all values to stdout and
    redact any secrets in it with the secrets masker. This redirector is
    applied to the 'airflow.task' logger.
    
    Co-authored-by: Alex Kennedy <[email protected]>
    (cherry picked from commit 3007159c2468f8e74476cc17573e03655ab168fa)
---
 airflow/cli/commands/task_command.py    | 10 +++++----
 airflow/utils/log/secrets_masker.py     | 38 ++++++++++++++++++++++---------
 tests/cli/commands/test_task_command.py | 40 ++++++++++++++++++++++-----------
 tests/utils/log/test_secrets_masker.py  | 23 ++++++++++++++++++-
 4 files changed, 83 insertions(+), 28 deletions(-)

diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index 2b743b91fe..a789779342 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -52,6 +52,7 @@ from airflow.utils.cli import (
 )
 from airflow.utils.dates import timezone
 from airflow.utils.log.logging_mixin import StreamLogWriter
+from airflow.utils.log.secrets_masker import RedactedIO
 from airflow.utils.net import get_hostname
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
@@ -539,10 +540,11 @@ def task_test(args, dag=None):
     ti, dr_created = _get_ti(task, args.execution_date_or_run_id, 
args.map_index, create_if_necessary="db")
 
     try:
-        if args.dry_run:
-            ti.dry_run()
-        else:
-            ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
+        with redirect_stdout(RedactedIO()):
+            if args.dry_run:
+                ti.dry_run()
+            else:
+                ti.run(ignore_task_deps=True, ignore_ti_state=True, 
test_mode=True)
     except Exception:
         if args.post_mortem:
             debugger = _guess_debugger()
diff --git a/airflow/utils/log/secrets_masker.py 
b/airflow/utils/log/secrets_masker.py
index de038be48b..bde5141719 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -18,18 +18,17 @@
 import collections
 import logging
 import re
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, 
Tuple, Union
+import sys
+from typing import Any, Dict, Iterable, List, Optional, Set, TextIO, Tuple, 
TypeVar, Union
 
 from airflow import settings
 from airflow.compat.functools import cache, cached_property
 
-if TYPE_CHECKING:
-    RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]
-
+Redactable = TypeVar("Redactable", str, Dict[Any, Any], Tuple[Any, ...], 
List[Any])
+Redacted = Union[Redactable, str]
 
 log = logging.getLogger(__name__)
 
-
 DEFAULT_SENSITIVE_FIELDS = frozenset(
     {
         'access_token',
@@ -91,14 +90,13 @@ def mask_secret(secret: Union[str, dict, Iterable], name: 
Optional[str] = None)
     _secrets_masker().add_mask(secret, name)
 
 
-def redact(value: "RedactableItem", name: Optional[str] = None) -> 
"RedactableItem":
+def redact(value: Redactable, name: Optional[str] = None) -> Redacted:
     """Redact any secrets found in ``value``."""
     return _secrets_masker().redact(value, name)
 
 
 @cache
 def _secrets_masker() -> "SecretsMasker":
-
     for flt in logging.getLogger('airflow.task').filters:
         if isinstance(flt, SecretsMasker):
             return flt
@@ -177,7 +175,7 @@ class SecretsMasker(logging.Filter):
 
         return True
 
-    def _redact_all(self, item: "RedactableItem", depth: int) -> 
"RedactableItem":
+    def _redact_all(self, item: Redactable, depth: int) -> Redacted:
         if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str):
             return '***'
         if isinstance(item, dict):
@@ -190,7 +188,7 @@ class SecretsMasker(logging.Filter):
         else:
             return item
 
-    def _redact(self, item: "RedactableItem", name: Optional[str], depth: int) 
-> "RedactableItem":
+    def _redact(self, item: Redactable, name: Optional[str], depth: int) -> 
Redacted:
         # Avoid spending too much effort on redacting on deeply nested
         # structures. This also avoid infinite recursion if a structure has
         # reference to self.
@@ -231,7 +229,7 @@ class SecretsMasker(logging.Filter):
             )
             return item
 
-    def redact(self, item: "RedactableItem", name: Optional[str] = None) -> 
"RedactableItem":
+    def redact(self, item: Redactable, name: Optional[str] = None) -> Redacted:
         """Redact an any secrets found in ``item``, if it is a string.
 
         If ``name`` is given, and it's a "sensitive" name (see
@@ -258,3 +256,23 @@ class SecretsMasker(logging.Filter):
         elif isinstance(secret, collections.abc.Iterable):
             for v in secret:
                 self.add_mask(v, name)
+
+
+class RedactedIO(TextIO):
+    """IO class that redacts values going into stdout.
+
+    Expected usage::
+
+        with contextlib.redirect_stdout(RedactedIO()):
+            ...  # Writes to stdout will be redacted.
+    """
+
+    def __init__(self):
+        self.target = sys.stdout
+
+    def write(self, s: str) -> int:
+        s = redact(s)
+        return self.target.write(s)
+
+    def flush(self) -> None:
+        return self.target.flush()
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index deaf474200..8df5a17492 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -57,7 +57,7 @@ def reset(dag_id):
 
 
 # TODO: Check if tests needs side effects - locally there's missing DAG
-class TestCliTasks(unittest.TestCase):
+class TestCliTasks:
     run_id = 'TEST_RUN_ID'
     dag_id = 'example_python_operator'
     parser: ArgumentParser
@@ -66,7 +66,7 @@ class TestCliTasks(unittest.TestCase):
     dag_run: DagRun
 
     @classmethod
-    def setUpClass(cls):
+    def setup_class(cls):
         cls.dagbag = DagBag(include_examples=True)
         cls.parser = cli_parser.get_parser()
         clear_db_runs()
@@ -77,7 +77,7 @@ class TestCliTasks(unittest.TestCase):
         )
 
     @classmethod
-    def tearDownClass(cls) -> None:
+    def teardown_class(cls) -> None:
         clear_db_runs()
 
     def test_cli_list_tasks(self):
@@ -102,20 +102,34 @@ class TestCliTasks(unittest.TestCase):
         assert "'example_python_operator__print_the_context__20180101'" in 
stdout.getvalue()
 
     
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
-    def test_test_with_existing_dag_run(self):
+    def test_test_with_existing_dag_run(self, caplog):
         """Test the `airflow test` command"""
         task_id = 'print_the_context'
-
         args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, 
DEFAULT_DATE.isoformat()])
+        with caplog.at_level("INFO", logger="airflow.task"):
+            task_command.task_test(args)
+        assert f"Marking task as SUCCESS. dag_id={self.dag_id}, 
task_id={task_id}" in caplog.text
+
+    
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
+    def test_test_filters_secrets(self, capsys):
+        """Test ``airflow test`` does not print secrets to stdout.
+
+        Output should be filtered by SecretsMasker.
+        """
+        password = "somepassword1234!"
+        logging.getLogger("airflow.task").filters[0].add_mask(password)
+        args = self.parser.parse_args(
+            ["tasks", "test", "example_python_operator", "print_the_context", 
"2018-01-01"],
+        )
 
-        with self.assertLogs('airflow.task', level='INFO') as cm:
+        with mock.patch("airflow.models.TaskInstance.run", new=lambda *_, 
**__: print(password)):
             task_command.task_test(args)
-            assert any(
-                [
-                    f"Marking task as SUCCESS. dag_id={self.dag_id}, 
task_id={task_id}" in log
-                    for log in cm.output
-                ]
-            )
+        assert capsys.readouterr().out.endswith("***\n")
+
+        not_password = "!4321drowssapemos"
+        with mock.patch("airflow.models.TaskInstance.run", new=lambda *_, 
**__: print(not_password)):
+            task_command.task_test(args)
+        assert capsys.readouterr().out.endswith(f"{not_password}\n")
 
     @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
     def test_run_with_existing_dag_run_id(self, mock_local_job):
@@ -164,7 +178,7 @@ class TestCliTasks(unittest.TestCase):
             task0_id,
             run_id,
         ]
-        with self.assertRaises(DagRunNotFound):
+        with pytest.raises(DagRunNotFound):
             task_command.task_run(self.parser.parse_args(args0), dag=dag)
 
     def test_cli_test_with_params(self):
diff --git a/tests/utils/log/test_secrets_masker.py 
b/tests/utils/log/test_secrets_masker.py
index edac9f49b7..ee0bc2eb98 100644
--- a/tests/utils/log/test_secrets_masker.py
+++ b/tests/utils/log/test_secrets_masker.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import contextlib
 import inspect
 import logging
 import logging.config
@@ -23,7 +24,7 @@ import textwrap
 import pytest
 
 from airflow import settings
-from airflow.utils.log.secrets_masker import SecretsMasker, 
should_hide_value_for_key
+from airflow.utils.log.secrets_masker import RedactedIO, SecretsMasker, 
should_hide_value_for_key
 from tests.test_utils.config import conf_vars
 
 settings.MASK_SECRETS_IN_LOGS = True
@@ -341,3 +342,23 @@ class ShortExcFormatter(logging.Formatter):
 def lineno():
     """Returns the current line number in our program."""
     return inspect.currentframe().f_back.f_lineno
+
+
+class TestRedactedIO:
+    def test_redacts_from_print(self, capsys):
+        # Without redacting, password is printed.
+        print(p)
+        stdout = capsys.readouterr().out
+        assert stdout == f"{p}\n"
+        assert "***" not in stdout
+
+        # With context manager, password is redacted.
+        with contextlib.redirect_stdout(RedactedIO()):
+            print(p)
+        stdout = capsys.readouterr().out
+        assert stdout == "***\n"
+
+    def test_write(self, capsys):
+        RedactedIO().write(p)
+        stdout = capsys.readouterr().out
+        assert stdout == "***"

Reply via email to