This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0df7ff1f748 Mask sensitive values from conn & variables in task runner
& DAG parsing (#48834)
0df7ff1f748 is described below
commit 0df7ff1f748fd736345ee5fd1d5318d3caf054b0
Author: Amogh Desai <[email protected]>
AuthorDate: Sun Apr 6 11:52:44 2025 +0530
Mask sensitive values from conn & variables in task runner & DAG parsing
(#48834)
---
airflow-core/src/airflow/models/connection.py | 8 +++++++-
airflow-core/src/airflow/models/variable.py | 6 +++++-
airflow-core/tests/unit/hooks/test_base.py | 2 ++
airflow-core/tests/unit/models/test_connection.py | 18 ++++++++++--------
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 7 +++++++
5 files changed, 31 insertions(+), 10 deletions(-)
diff --git a/airflow-core/src/airflow/models/connection.py
b/airflow-core/src/airflow/models/connection.py
index aedc33b5af5..221b4fba994 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -470,7 +470,13 @@ class Connection(Base, LoggingMixin):
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
try:
- return TaskSDKConnection.get(conn_id=conn_id)
+ conn = TaskSDKConnection.get(conn_id=conn_id)
+ if isinstance(conn, TaskSDKConnection):
+ if conn.password:
+ mask_secret(conn.password)
+ if conn.extra:
+ mask_secret(conn.extra)
+ return conn
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
log.debug("Unable to retrieve connection from
MetastoreBackend using Task SDK")
diff --git a/airflow-core/src/airflow/models/variable.py
b/airflow-core/src/airflow/models/variable.py
index d2532e1203e..e0b91686285 100644
--- a/airflow-core/src/airflow/models/variable.py
+++ b/airflow-core/src/airflow/models/variable.py
@@ -154,11 +154,15 @@ class Variable(Base, LoggingMixin):
from airflow.sdk import Variable as TaskSDKVariable
from airflow.sdk.definitions._internal.types import NOTSET
- return TaskSDKVariable.get(
+ var_val = TaskSDKVariable.get(
key,
default=NOTSET if default_var is cls.__NO_DEFAULT_SENTINEL
else default_var,
deserialize_json=deserialize_json,
)
+ if isinstance(var_val, str):
+ mask_secret(var_val, key)
+
+ return var_val
var_val = Variable.get_variable_from_secrets(key=key)
if var_val is None:
diff --git a/airflow-core/tests/unit/hooks/test_base.py
b/airflow-core/tests/unit/hooks/test_base.py
index a5cca2e1e41..0cd6d05dc8f 100644
--- a/airflow-core/tests/unit/hooks/test_base.py
+++ b/airflow-core/tests/unit/hooks/test_base.py
@@ -67,6 +67,8 @@ class TestBaseHook:
hook = BaseHook(logger_name="")
hook.get_connection(conn_id="test_conn")
+ print(mock_supervisor_comms.send_request.call_args_list)
+
mock_supervisor_comms.send_request.assert_called_once_with(
msg=GetConnection(conn_id="test_conn"), log=mock.ANY
)
diff --git a/airflow-core/tests/unit/models/test_connection.py
b/airflow-core/tests/unit/models/test_connection.py
index e4650820a65..653869785e4 100644
--- a/airflow-core/tests/unit/models/test_connection.py
+++ b/airflow-core/tests/unit/models/test_connection.py
@@ -278,20 +278,22 @@ class TestConnection:
"headers": {"Content-Type": "application/json", "X-Requested-By":
"Airflow"},
}
- @mock.patch("airflow.sdk.Connection")
- def test_get_connection_from_secrets_task_sdk_success(self,
mock_task_sdk_connection):
+ @mock.patch("airflow.sdk.Connection.get")
+ def test_get_connection_from_secrets_task_sdk_success(self, mock_get):
"""Test the get_connection_from_secrets method with Task SDK success
path."""
- # Mock sys.modules for task_runner
+ from airflow.sdk import Connection as SDKConnection
+
+ expected_connection = SDKConnection(conn_id="test_conn",
conn_type="test_type")
+ mock_get.return_value = expected_connection
+
mock_task_runner = mock.MagicMock()
mock_task_runner.SUPERVISOR_COMMS = True
- expected_connection = Connection(conn_id="test_conn",
conn_type="test_type")
- mock_task_sdk_connection.get.return_value = expected_connection
-
with mock.patch.dict(sys.modules,
{"airflow.sdk.execution_time.task_runner": mock_task_runner}):
result = Connection.get_connection_from_secrets("test_conn")
- assert result.conn_id == expected_connection.conn_id
- assert result.conn_type == expected_connection.conn_type
+
+ assert result.conn_id == "test_conn"
+ assert result.conn_type == "test_type"
@mock.patch("airflow.sdk.Connection")
def test_get_connection_from_secrets_task_sdk_not_found(self,
mock_task_sdk_connection):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index c924e11322c..932fcfbb1f8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -99,6 +99,7 @@ from airflow.sdk.execution_time.comms import (
XComCountResponse,
XComResult,
)
+from airflow.sdk.execution_time.secrets_masker import mask_secret
if TYPE_CHECKING:
from structlog.typing import FilteringBoundLogger, WrappedLogger
@@ -915,6 +916,10 @@ class ActivitySubprocess(WatchedSubprocess):
elif isinstance(msg, GetConnection):
conn = self.client.connections.get(msg.conn_id)
if isinstance(conn, ConnectionResponse):
+ if conn.password:
+ mask_secret(conn.password)
+ if conn.extra:
+ mask_secret(conn.extra)
conn_result = ConnectionResult.from_conn_response(conn)
resp = conn_result.model_dump_json(exclude_unset=True,
by_alias=True).encode()
else:
@@ -922,6 +927,8 @@ class ActivitySubprocess(WatchedSubprocess):
elif isinstance(msg, GetVariable):
var = self.client.variables.get(msg.key)
if isinstance(var, VariableResponse):
+ if var.value:
+ mask_secret(var.value)
var_result = VariableResult.from_variable_response(var)
resp = var_result.model_dump_json(exclude_unset=True).encode()
else: