This is an automated email from the ASF dual-hosted git repository.
bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 32053486119 Amend compatibility issues for airflowctl (#63388)
32053486119 is described below
commit 32053486119a330ebe7f1fd5e45f205e67bfec22
Author: Bugra Ozturk <[email protected]>
AuthorDate: Thu Mar 12 18:43:50 2026 +0100
Amend compatibility issues for airflowctl (#63388)
* Fix compatibility issues for airflowctl
---
airflow-ctl/src/airflowctl/api/operations.py | 99 +++++++++++++++++++++-------
1 file changed, 74 insertions(+), 25 deletions(-)
diff --git a/airflow-ctl/src/airflowctl/api/operations.py
b/airflow-ctl/src/airflowctl/api/operations.py
index 445d490c08d..64424eff9c5 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -19,11 +19,11 @@ from __future__ import annotations
import datetime
import json
-from typing import TYPE_CHECKING, Any, TypeVar
+from typing import TYPE_CHECKING, Any, TypeVar, get_args
import httpx
import structlog
-from pydantic import BaseModel
+from pydantic import BaseModel, ValidationError
from airflowctl.api.datamodels.auth_generated import LoginBody, LoginResponse
from airflowctl.api.datamodels.generated import (
@@ -133,6 +133,43 @@ def _check_flag_and_exit_if_server_response_error(func):
return wrapped
+TYPE_DEFAULTS = {
+ bool: False,
+ int: 0,
+ float: 0.0,
+ str: "",
+ list: [],
+ dict: {},
+}
+
+
+def get_field_default(annotation) -> Any:
+ args = get_args(annotation)
+ if args:
+ non_none = [a for a in args if a is not type(None)]
+ if non_none:
+ return get_field_default(non_none[0])
+ return TYPE_DEFAULTS.get(annotation, None)
+
+
+def fill_missing_fields(data: dict, model: type[BaseModel]) -> dict:
+ for field_name, field_info in model.model_fields.items():
+ annotation = field_info.annotation
+ args = get_args(annotation)
+ if field_name not in data and field_info.is_required():
+ data[field_name] = get_field_default(annotation)
+ elif field_name in data and isinstance(data[field_name], dict):
+ if isinstance(annotation, type) and issubclass(annotation,
BaseModel):
+ data[field_name] = fill_missing_fields(data[field_name],
annotation)
+ elif field_name in data and isinstance(data[field_name], list) and
args:
+ if isinstance(args[0], type) and issubclass(args[0], BaseModel):
+ data[field_name] = [
+ fill_missing_fields(item, args[0]) if isinstance(item,
dict) else item
+ for item in data[field_name]
+ ]
+ return data
+
+
class BaseOperations:
"""
Base class for operations.
@@ -155,33 +192,35 @@ class BaseOperations:
if callable(value):
setattr(cls, attr,
_check_flag_and_exit_if_server_response_error(value))
- def execute_list(
- self,
- *,
- path: str,
- data_model: type[T],
- offset: int = 0,
- limit: int = 50,
- params: dict | None = None,
- ) -> T | ServerResponseError:
+ def execute_list(self, *, path, data_model, offset=0, limit=50,
params=None):
shared_params = {"limit": limit, **(params or {})}
+
+ def safe_validate(content: bytes) -> BaseModel:
+ try:
+ return data_model.model_validate_json(content) # type:
ignore[union-attr]
+ except ValidationError:
+ raw = fill_missing_fields(json.loads(content), data_model)
+ return data_model.model_validate(raw) # type:
ignore[union-attr]
+
self.response = self.client.get(path, params=shared_params)
- first_pass = data_model.model_validate_json(self.response.content)
+ first_pass = safe_validate(self.response.content)
total_entries = first_pass.total_entries # type: ignore[attr-defined]
if total_entries < limit:
return first_pass
+ found_key = None
for key, value in first_pass.model_dump().items():
if key != "total_entries" and isinstance(value, list):
+ found_key = key
break
- entry_list = getattr(first_pass, key)
+ entry_list = getattr(first_pass, found_key)
offset = offset + limit
while offset < total_entries:
self.response = self.client.get(path, params={**shared_params,
"offset": offset})
- entry = data_model.model_validate_json(self.response.content)
+ entry = safe_validate(self.response.content)
offset = offset + limit
- entry_list.extend(getattr(entry, key))
- obj = data_model(**{key: entry_list, "total_entries": total_entries})
- return data_model.model_validate(obj.model_dump())
+ entry_list.extend(getattr(entry, found_key))
+ obj = data_model(**{found_key: entry_list, "total_entries":
total_entries})
+ return data_model.model_validate(obj.model_dump()) # type:
ignore[union-attr]
# Login operations
@@ -237,7 +276,9 @@ class AssetsOperations(BaseOperations):
# Ensure extra is initialised before sent to API
if asset_event_body.extra is None:
asset_event_body.extra = {}
- self.response = self.client.post("assets/events",
json=asset_event_body.model_dump(mode="json"))
+ self.response = self.client.post(
+ "assets/events", json=asset_event_body.model_dump(mode="json",
exclude_none=True)
+ )
return
AssetEventResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -307,7 +348,9 @@ class BackfillOperations(BaseOperations):
def create(self, backfill: BackfillPostBody) -> BackfillResponse |
ServerResponseError:
"""Create a backfill."""
try:
- self.response = self.client.post("backfills",
data=backfill.model_dump(mode="json"))
+ self.response = self.client.post(
+ "backfills", data=backfill.model_dump(mode="json",
exclude_none=True)
+ )
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -315,7 +358,9 @@ class BackfillOperations(BaseOperations):
def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse |
ServerResponseError:
"""Create a dry run backfill."""
try:
- self.response = self.client.post("backfills/dry_run",
data=backfill.model_dump(mode="json"))
+ self.response = self.client.post(
+ "backfills/dry_run", data=backfill.model_dump(mode="json",
exclude_none=True)
+ )
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -399,7 +444,9 @@ class ConnectionsOperations(BaseOperations):
) -> ConnectionResponse | ServerResponseError:
"""Create a connection."""
try:
- self.response = self.client.post("connections",
json=connection.model_dump(mode="json"))
+ self.response = self.client.post(
+ "connections", json=connection.model_dump(mode="json",
exclude_none=True)
+ )
return
ConnectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -618,7 +665,7 @@ class PoolsOperations(BaseOperations):
def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError:
"""Create a pool."""
try:
- self.response = self.client.post("pools",
json=pool.model_dump(mode="json"))
+ self.response = self.client.post("pools",
json=pool.model_dump(mode="json", exclude_none=True))
return PoolResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -676,7 +723,9 @@ class VariablesOperations(BaseOperations):
def create(self, variable: VariableBody) -> VariableResponse |
ServerResponseError:
"""Create a variable."""
try:
- self.response = self.client.post("variables",
json=variable.model_dump(mode="json"))
+ self.response = self.client.post(
+ "variables", json=variable.model_dump(mode="json",
exclude_none=True)
+ )
return VariableResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -786,7 +835,7 @@ class XComOperations(BaseOperations):
try:
self.response = self.client.post(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
- json=body.model_dump(mode="json", exclude_unset=True),
+ json=body.model_dump(mode="json", exclude_unset=True,
exclude_none=True),
)
return
XComResponseNative.model_validate_json(self.response.content)
except ServerResponseError as e:
@@ -814,7 +863,7 @@ class XComOperations(BaseOperations):
try:
self.response = self.client.patch(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
- json=body.model_dump(mode="json", exclude_unset=True),
+ json=body.model_dump(mode="json", exclude_unset=True,
exclude_none=True),
)
return
XComResponseNative.model_validate_json(self.response.content)
except ServerResponseError as e: