This is an automated email from the ASF dual-hosted git repository.
bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 908058951c5 Use exclude_defaults to prevent explicitly empty fields to
be in request (#57718)
908058951c5 is described below
commit 908058951c5e7e8c8c2b27c956ab9c69b122a432
Author: Bugra Ozturk <[email protected]>
AuthorDate: Wed Dec 17 08:49:59 2025 +0100
Use exclude_defaults to prevent explicitly empty fields to be in request
(#57718)
* Remove layer from client and include to pydantic model for not passed
values
---
airflow-ctl/src/airflowctl/api/operations.py | 72 ++++++++++++++--------
.../src/airflowctl/ctl/commands/dag_command.py | 3 +-
2 files changed, 49 insertions(+), 26 deletions(-)
diff --git a/airflow-ctl/src/airflowctl/api/operations.py
b/airflow-ctl/src/airflowctl/api/operations.py
index 31e0298a19d..d4e9e212db7 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -116,6 +116,7 @@ def _check_flag_and_exit_if_server_response_error(func):
return response
def wrapped(self, *args, **kwargs):
+ # Set highest level error handler to each method with decorator
try:
if self.exit_in_error:
return _exit_if_server_response_error(response=func(self,
*args, **kwargs))
@@ -165,7 +166,7 @@ class BaseOperations:
total_entries = first_pass.total_entries # type: ignore[attr-defined]
if total_entries < limit:
return first_pass
- for key, value in first_pass.model_dump().items():
+ for key, value in
first_pass.model_dump(exclude_defaults=False).items():
if key != "total_entries" and isinstance(value, list):
break
entry_list = getattr(first_pass, key)
@@ -176,7 +177,7 @@ class BaseOperations:
offset = offset + limit
entry_list.extend(getattr(entry, key))
obj = data_model(**{key: entry_list, "total_entries": total_entries})
- return data_model.model_validate(obj.model_dump())
+ return
data_model.model_validate(obj.model_dump(exclude_defaults=False))
# Login operations
@@ -190,7 +191,9 @@ class LoginOperations:
"""Login to the API server."""
try:
return LoginResponse.model_validate_json(
- self.client.post("/token/cli",
json=login.model_dump(mode="json")).content
+ self.client.post(
+ "/token/cli", json=login.model_dump(mode="json",
exclude_defaults=False)
+ ).content
)
except ServerResponseError as e:
raise e
@@ -230,9 +233,9 @@ class AssetsOperations(BaseOperations):
"""Create an asset event."""
try:
# Ensure extra is initialised before sent to API
- if asset_event_body.extra is None:
- asset_event_body.extra = {}
- self.response = self.client.post("assets/events",
json=asset_event_body.model_dump(mode="json"))
+ self.response = self.client.post(
+ "assets/events", json=asset_event_body.model_dump(mode="json",
exclude_defaults=False)
+ )
return
AssetEventResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -302,7 +305,9 @@ class BackfillOperations(BaseOperations):
def create(self, backfill: BackfillPostBody) -> BackfillResponse |
ServerResponseError:
"""Create a backfill."""
try:
- self.response = self.client.post("backfills",
data=backfill.model_dump(mode="json"))
+ self.response = self.client.post(
+ "backfills", json=backfill.model_dump(mode="json",
exclude_defaults=False)
+ )
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -310,7 +315,9 @@ class BackfillOperations(BaseOperations):
def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse |
ServerResponseError:
"""Create a dry run backfill."""
try:
- self.response = self.client.post("backfills/dry_run",
data=backfill.model_dump(mode="json"))
+ self.response = self.client.post(
+ "backfills/dry_run", json=backfill.model_dump(mode="json",
exclude_defaults=False)
+ )
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -331,7 +338,7 @@ class BackfillOperations(BaseOperations):
def pause(self, backfill_id: str) -> BackfillResponse |
ServerResponseError:
"""Pause a backfill."""
try:
- self.response = self.client.post(f"backfills/{backfill_id}/pause")
+ self.response = self.client.put(f"backfills/{backfill_id}/pause")
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -339,7 +346,7 @@ class BackfillOperations(BaseOperations):
def unpause(self, backfill_id: str) -> BackfillResponse |
ServerResponseError:
"""Unpause a backfill."""
try:
- self.response =
self.client.post(f"backfills/{backfill_id}/unpause")
+ self.response = self.client.put(f"backfills/{backfill_id}/unpause")
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -347,7 +354,7 @@ class BackfillOperations(BaseOperations):
def cancel(self, backfill_id: str) -> BackfillResponse |
ServerResponseError:
"""Cancel a backfill."""
try:
- self.response = self.client.post(f"backfills/{backfill_id}/cancel")
+ self.response = self.client.put(f"backfills/{backfill_id}/cancel")
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -394,7 +401,9 @@ class ConnectionsOperations(BaseOperations):
) -> ConnectionResponse | ServerResponseError:
"""Create a connection."""
try:
- self.response = self.client.post("connections",
json=connection.model_dump(mode="json"))
+ self.response = self.client.post(
+ "connections", json=connection.model_dump(mode="json",
exclude_defaults=False)
+ )
return
ConnectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -402,7 +411,9 @@ class ConnectionsOperations(BaseOperations):
def bulk(self, connections: BulkBodyConnectionBody) -> BulkResponse |
ServerResponseError:
"""CRUD multiple connections."""
try:
- self.response = self.client.patch("connections",
json=connections.model_dump(mode="json"))
+ self.response = self.client.patch(
+ "connections", json=connections.model_dump(mode="json",
exclude_defaults=False)
+ )
return BulkResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -430,7 +441,8 @@ class ConnectionsOperations(BaseOperations):
"""Update a connection."""
try:
self.response = self.client.patch(
- f"connections/{connection.connection_id}",
json=connection.model_dump(mode="json")
+ f"connections/{connection.connection_id}",
+ json=connection.model_dump(mode="json",
exclude_defaults=False),
)
return
ConnectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
@@ -442,7 +454,9 @@ class ConnectionsOperations(BaseOperations):
) -> ConnectionTestResponse | ServerResponseError:
"""Test a connection."""
try:
- self.response = self.client.post("connections/test",
json=connection.model_dump(mode="json"))
+ self.response = self.client.post(
+ "connections/test", json=connection.model_dump(mode="json",
exclude_defaults=False)
+ )
return
ConnectionTestResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -477,7 +491,9 @@ class DagsOperations(BaseOperations):
def update(self, dag_id: str, dag_body: DAGPatchBody) -> DAGResponse |
ServerResponseError:
try:
- self.response = self.client.patch(f"dags/{dag_id}",
json=dag_body.model_dump(mode="json"))
+ self.response = self.client.patch(
+ f"dags/{dag_id}", json=dag_body.model_dump(mode="json",
exclude_defaults=False)
+ )
return DAGResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -525,11 +541,9 @@ class DagsOperations(BaseOperations):
self, dag_id: str, trigger_dag_run: TriggerDAGRunPostBody
) -> DAGRunResponse | ServerResponseError:
"""Create a dag run."""
- if trigger_dag_run.conf is None:
- trigger_dag_run.conf = {}
try:
self.response = self.client.post(
- f"dags/{dag_id}/dagRuns",
json=trigger_dag_run.model_dump(mode="json")
+ f"dags/{dag_id}/dagRuns",
json=trigger_dag_run.model_dump(mode="json", exclude_defaults=False)
)
return DAGRunResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
@@ -597,7 +611,9 @@ class PoolsOperations(BaseOperations):
def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError:
"""Create a pool."""
try:
- self.response = self.client.post("pools",
json=pool.model_dump(mode="json"))
+ self.response = self.client.post(
+ "pools", json=pool.model_dump(mode="json",
exclude_defaults=False)
+ )
return PoolResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -605,7 +621,9 @@ class PoolsOperations(BaseOperations):
def bulk(self, pools: BulkBodyPoolBody) -> BulkResponse |
ServerResponseError:
"""CRUD multiple pools."""
try:
- self.response = self.client.patch("pools",
json=pools.model_dump(mode="json"))
+ self.response = self.client.patch(
+ "pools", json=pools.model_dump(mode="json",
exclude_defaults=False)
+ )
return BulkResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -622,7 +640,7 @@ class PoolsOperations(BaseOperations):
"""Update a pool."""
try:
self.response = self.client.patch(
- f"pools/{pool_body.pool}",
json=pool_body.model_dump(mode="json")
+ f"pools/{pool_body.pool}",
json=pool_body.model_dump(mode="json", exclude_defaults=False)
)
return PoolResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
@@ -655,7 +673,9 @@ class VariablesOperations(BaseOperations):
def create(self, variable: VariableBody) -> VariableResponse |
ServerResponseError:
"""Create a variable."""
try:
- self.response = self.client.post("variables",
json=variable.model_dump(mode="json"))
+ self.response = self.client.post(
+ "variables", json=variable.model_dump(mode="json",
exclude_defaults=False)
+ )
return VariableResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -663,7 +683,9 @@ class VariablesOperations(BaseOperations):
def bulk(self, variables: BulkBodyVariableBody) -> BulkResponse |
ServerResponseError:
"""CRUD multiple variables."""
try:
- self.response = self.client.patch("variables",
json=variables.model_dump(mode="json"))
+ self.response = self.client.patch(
+ "variables", json=variables.model_dump(mode="json",
exclude_defaults=False)
+ )
return BulkResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -680,7 +702,7 @@ class VariablesOperations(BaseOperations):
"""Update a variable."""
try:
self.response = self.client.patch(
- f"variables/{variable.key}",
json=variable.model_dump(mode="json")
+ f"variables/{variable.key}",
json=variable.model_dump(mode="json", exclude_defaults=False)
)
return VariableResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
diff --git a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py
b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py
index 9b43be47eb2..0391d934ecb 100644
--- a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py
+++ b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py
@@ -22,8 +22,9 @@ from typing import Literal
import rich
-from airflowctl.api.client import NEW_API_CLIENT, ClientKind,
ServerResponseError, provide_api_client
+from airflowctl.api.client import NEW_API_CLIENT, ClientKind,
provide_api_client
from airflowctl.api.datamodels.generated import DAGPatchBody
+from airflowctl.api.operations import ServerResponseError
from airflowctl.ctl.console_formatting import AirflowConsole