This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0df7ff1f748 Mask sensitive values from conn & variables in task runner 
& DAG parsing (#48834)
0df7ff1f748 is described below

commit 0df7ff1f748fd736345ee5fd1d5318d3caf054b0
Author: Amogh Desai <[email protected]>
AuthorDate: Sun Apr 6 11:52:44 2025 +0530

    Mask sensitive values from conn & variables in task runner & DAG parsing 
(#48834)
---
 airflow-core/src/airflow/models/connection.py         |  8 +++++++-
 airflow-core/src/airflow/models/variable.py           |  6 +++++-
 airflow-core/tests/unit/hooks/test_base.py            |  2 ++
 airflow-core/tests/unit/models/test_connection.py     | 18 ++++++++++--------
 task-sdk/src/airflow/sdk/execution_time/supervisor.py |  7 +++++++
 5 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/models/connection.py 
b/airflow-core/src/airflow/models/connection.py
index aedc33b5af5..221b4fba994 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -470,7 +470,13 @@ class Connection(Base, LoggingMixin):
             from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
 
             try:
-                return TaskSDKConnection.get(conn_id=conn_id)
+                conn = TaskSDKConnection.get(conn_id=conn_id)
+                if isinstance(conn, TaskSDKConnection):
+                    if conn.password:
+                        mask_secret(conn.password)
+                    if conn.extra:
+                        mask_secret(conn.extra)
+                return conn
             except AirflowRuntimeError as e:
                 if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
                     log.debug("Unable to retrieve connection from 
MetastoreBackend using Task SDK")
diff --git a/airflow-core/src/airflow/models/variable.py 
b/airflow-core/src/airflow/models/variable.py
index d2532e1203e..e0b91686285 100644
--- a/airflow-core/src/airflow/models/variable.py
+++ b/airflow-core/src/airflow/models/variable.py
@@ -154,11 +154,15 @@ class Variable(Base, LoggingMixin):
             from airflow.sdk import Variable as TaskSDKVariable
             from airflow.sdk.definitions._internal.types import NOTSET
 
-            return TaskSDKVariable.get(
+            var_val = TaskSDKVariable.get(
                 key,
                 default=NOTSET if default_var is cls.__NO_DEFAULT_SENTINEL 
else default_var,
                 deserialize_json=deserialize_json,
             )
+            if isinstance(var_val, str):
+                mask_secret(var_val, key)
+
+            return var_val
 
         var_val = Variable.get_variable_from_secrets(key=key)
         if var_val is None:
diff --git a/airflow-core/tests/unit/hooks/test_base.py 
b/airflow-core/tests/unit/hooks/test_base.py
index a5cca2e1e41..0cd6d05dc8f 100644
--- a/airflow-core/tests/unit/hooks/test_base.py
+++ b/airflow-core/tests/unit/hooks/test_base.py
@@ -67,6 +67,8 @@ class TestBaseHook:
         hook = BaseHook(logger_name="")
         hook.get_connection(conn_id="test_conn")
 
+        print(mock_supervisor_comms.send_request.call_args_list)
+
         mock_supervisor_comms.send_request.assert_called_once_with(
             msg=GetConnection(conn_id="test_conn"), log=mock.ANY
         )
diff --git a/airflow-core/tests/unit/models/test_connection.py 
b/airflow-core/tests/unit/models/test_connection.py
index e4650820a65..653869785e4 100644
--- a/airflow-core/tests/unit/models/test_connection.py
+++ b/airflow-core/tests/unit/models/test_connection.py
@@ -278,20 +278,22 @@ class TestConnection:
             "headers": {"Content-Type": "application/json", "X-Requested-By": 
"Airflow"},
         }
 
-    @mock.patch("airflow.sdk.Connection")
-    def test_get_connection_from_secrets_task_sdk_success(self, 
mock_task_sdk_connection):
+    @mock.patch("airflow.sdk.Connection.get")
+    def test_get_connection_from_secrets_task_sdk_success(self, mock_get):
         """Test the get_connection_from_secrets method with Task SDK success 
path."""
-        # Mock sys.modules for task_runner
+        from airflow.sdk import Connection as SDKConnection
+
+        expected_connection = SDKConnection(conn_id="test_conn", 
conn_type="test_type")
+        mock_get.return_value = expected_connection
+
         mock_task_runner = mock.MagicMock()
         mock_task_runner.SUPERVISOR_COMMS = True
 
-        expected_connection = Connection(conn_id="test_conn", 
conn_type="test_type")
-        mock_task_sdk_connection.get.return_value = expected_connection
-
         with mock.patch.dict(sys.modules, 
{"airflow.sdk.execution_time.task_runner": mock_task_runner}):
             result = Connection.get_connection_from_secrets("test_conn")
-            assert result.conn_id == expected_connection.conn_id
-            assert result.conn_type == expected_connection.conn_type
+
+            assert result.conn_id == "test_conn"
+            assert result.conn_type == "test_type"
 
     @mock.patch("airflow.sdk.Connection")
     def test_get_connection_from_secrets_task_sdk_not_found(self, 
mock_task_sdk_connection):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index c924e11322c..932fcfbb1f8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -99,6 +99,7 @@ from airflow.sdk.execution_time.comms import (
     XComCountResponse,
     XComResult,
 )
+from airflow.sdk.execution_time.secrets_masker import mask_secret
 
 if TYPE_CHECKING:
     from structlog.typing import FilteringBoundLogger, WrappedLogger
@@ -915,6 +916,10 @@ class ActivitySubprocess(WatchedSubprocess):
         elif isinstance(msg, GetConnection):
             conn = self.client.connections.get(msg.conn_id)
             if isinstance(conn, ConnectionResponse):
+                if conn.password:
+                    mask_secret(conn.password)
+                if conn.extra:
+                    mask_secret(conn.extra)
                 conn_result = ConnectionResult.from_conn_response(conn)
                 resp = conn_result.model_dump_json(exclude_unset=True, 
by_alias=True).encode()
             else:
@@ -922,6 +927,8 @@ class ActivitySubprocess(WatchedSubprocess):
         elif isinstance(msg, GetVariable):
             var = self.client.variables.get(msg.key)
             if isinstance(var, VariableResponse):
+                if var.value:
+                    mask_secret(var.value)
                 var_result = VariableResult.from_variable_response(var)
                 resp = var_result.model_dump_json(exclude_unset=True).encode()
             else:

Reply via email to