This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ed80816316b Fix schema alias handling in ConnectionResult (#46957)
ed80816316b is described below
commit ed80816316b0640a75a1722f1c657d9746fa7956
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Feb 24 19:39:15 2025 +0530
Fix schema alias handling in ConnectionResult (#46957)
---
airflow/dag_processing/processor.py | 2 +-
task_sdk/src/airflow/sdk/execution_time/comms.py | 4 +++-
task_sdk/src/airflow/sdk/execution_time/supervisor.py | 2 +-
task_sdk/tests/execution_time/test_supervisor.py | 9 +++++++++
4 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 7360f0b7c7a..bba6b6857d8 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -277,7 +277,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
conn = self.client.connections.get(msg.conn_id)
if isinstance(conn, ConnectionResponse):
conn_result = ConnectionResult.from_conn_response(conn)
- resp = conn_result.model_dump_json(exclude_unset=True).encode()
+ resp = conn_result.model_dump_json(exclude_unset=True,
by_alias=True).encode()
else:
resp = conn.model_dump_json().encode()
elif isinstance(msg, GetVariable):
diff --git a/task_sdk/src/airflow/sdk/execution_time/comms.py
b/task_sdk/src/airflow/sdk/execution_time/comms.py
index e0e96ba5158..b0c6dd62abc 100644
--- a/task_sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task_sdk/src/airflow/sdk/execution_time/comms.py
@@ -139,7 +139,9 @@ class ConnectionResult(ConnectionResponse):
# Exclude defaults to avoid sending unnecessary data
# Pass the type as ConnectionResult explicitly so we can then call
model_dump_json with exclude_unset=True
# to avoid sending unset fields (which are defaults in our case).
- return cls(**connection_response.model_dump(exclude_defaults=True),
type="ConnectionResult")
+ return cls(
+ **connection_response.model_dump(exclude_defaults=True,
by_alias=True), type="ConnectionResult"
+ )
class VariableResult(VariableResponse):
diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index 0cbd850ae39..ca95a7003f4 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -787,7 +787,7 @@ class ActivitySubprocess(WatchedSubprocess):
conn = self.client.connections.get(msg.conn_id)
if isinstance(conn, ConnectionResponse):
conn_result = ConnectionResult.from_conn_response(conn)
- resp = conn_result.model_dump_json(exclude_unset=True).encode()
+ resp = conn_result.model_dump_json(exclude_unset=True,
by_alias=True).encode()
else:
resp = conn.model_dump_json().encode()
elif isinstance(msg, GetVariable):
diff --git a/task_sdk/tests/execution_time/test_supervisor.py
b/task_sdk/tests/execution_time/test_supervisor.py
index 61899dca3eb..ececcbe31d8 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -930,6 +930,15 @@ class TestHandleRequest:
ConnectionResult(conn_id="test_conn", conn_type="mysql"),
id="get_connection",
),
+ pytest.param(
+ GetConnection(conn_id="test_conn"),
+
b'{"conn_id":"test_conn","conn_type":"mysql","schema":"mysql","type":"ConnectionResult"}\n',
+ "connections.get",
+ ("test_conn",),
+ {},
+ ConnectionResult(conn_id="test_conn", conn_type="mysql",
schema="mysql"), # type: ignore[call-arg]
+ id="get_connection_with_alias",
+ ),
pytest.param(
GetVariable(key="test_key"),
b'{"key":"test_key","value":"test_value","type":"VariableResult"}\n',