This is an automated email from the ASF dual-hosted git repository.

bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 32053486119 Amend compatibility issues for airflowctl (#63388)
32053486119 is described below

commit 32053486119a330ebe7f1fd5e45f205e67bfec22
Author: Bugra Ozturk <[email protected]>
AuthorDate: Thu Mar 12 18:43:50 2026 +0100

    Amend compatibility issues for airflowctl (#63388)
    
    * Fix compatibility issues for airflowctl
---
 airflow-ctl/src/airflowctl/api/operations.py | 99 +++++++++++++++++++++-------
 1 file changed, 74 insertions(+), 25 deletions(-)

diff --git a/airflow-ctl/src/airflowctl/api/operations.py 
b/airflow-ctl/src/airflowctl/api/operations.py
index 445d490c08d..64424eff9c5 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -19,11 +19,11 @@ from __future__ import annotations
 
 import datetime
 import json
-from typing import TYPE_CHECKING, Any, TypeVar
+from typing import TYPE_CHECKING, Any, TypeVar, get_args
 
 import httpx
 import structlog
-from pydantic import BaseModel
+from pydantic import BaseModel, ValidationError
 
 from airflowctl.api.datamodels.auth_generated import LoginBody, LoginResponse
 from airflowctl.api.datamodels.generated import (
@@ -133,6 +133,43 @@ def _check_flag_and_exit_if_server_response_error(func):
     return wrapped
 
 
+TYPE_DEFAULTS = {
+    bool: False,
+    int: 0,
+    float: 0.0,
+    str: "",
+    list: [],
+    dict: {},
+}
+
+
+def get_field_default(annotation) -> Any:
+    args = get_args(annotation)
+    if args:
+        non_none = [a for a in args if a is not type(None)]
+        if non_none:
+            return get_field_default(non_none[0])
+    return TYPE_DEFAULTS.get(annotation, None)
+
+
+def fill_missing_fields(data: dict, model: type[BaseModel]) -> dict:
+    for field_name, field_info in model.model_fields.items():
+        annotation = field_info.annotation
+        args = get_args(annotation)
+        if field_name not in data and field_info.is_required():
+            data[field_name] = get_field_default(annotation)
+        elif field_name in data and isinstance(data[field_name], dict):
+            if isinstance(annotation, type) and issubclass(annotation, 
BaseModel):
+                data[field_name] = fill_missing_fields(data[field_name], 
annotation)
+        elif field_name in data and isinstance(data[field_name], list) and 
args:
+            if isinstance(args[0], type) and issubclass(args[0], BaseModel):
+                data[field_name] = [
+                    fill_missing_fields(item, args[0]) if isinstance(item, 
dict) else item
+                    for item in data[field_name]
+                ]
+    return data
+
+
 class BaseOperations:
     """
     Base class for operations.
@@ -155,33 +192,35 @@ class BaseOperations:
             if callable(value):
                 setattr(cls, attr, 
_check_flag_and_exit_if_server_response_error(value))
 
-    def execute_list(
-        self,
-        *,
-        path: str,
-        data_model: type[T],
-        offset: int = 0,
-        limit: int = 50,
-        params: dict | None = None,
-    ) -> T | ServerResponseError:
+    def execute_list(self, *, path, data_model, offset=0, limit=50, 
params=None):
         shared_params = {"limit": limit, **(params or {})}
+
+        def safe_validate(content: bytes) -> BaseModel:
+            try:
+                return data_model.model_validate_json(content)  # type: 
ignore[union-attr]
+            except ValidationError:
+                raw = fill_missing_fields(json.loads(content), data_model)
+                return data_model.model_validate(raw)  # type: 
ignore[union-attr]
+
         self.response = self.client.get(path, params=shared_params)
-        first_pass = data_model.model_validate_json(self.response.content)
+        first_pass = safe_validate(self.response.content)
         total_entries = first_pass.total_entries  # type: ignore[attr-defined]
         if total_entries < limit:
             return first_pass
+        found_key = None
         for key, value in first_pass.model_dump().items():
             if key != "total_entries" and isinstance(value, list):
+                found_key = key
                 break
-        entry_list = getattr(first_pass, key)
+        entry_list = getattr(first_pass, found_key)
         offset = offset + limit
         while offset < total_entries:
             self.response = self.client.get(path, params={**shared_params, 
"offset": offset})
-            entry = data_model.model_validate_json(self.response.content)
+            entry = safe_validate(self.response.content)
             offset = offset + limit
-            entry_list.extend(getattr(entry, key))
-        obj = data_model(**{key: entry_list, "total_entries": total_entries})
-        return data_model.model_validate(obj.model_dump())
+            entry_list.extend(getattr(entry, found_key))
+        obj = data_model(**{found_key: entry_list, "total_entries": 
total_entries})
+        return data_model.model_validate(obj.model_dump())  # type: 
ignore[union-attr]
 
 
 # Login operations
@@ -237,7 +276,9 @@ class AssetsOperations(BaseOperations):
             # Ensure extra is initialised before sent to API
             if asset_event_body.extra is None:
                 asset_event_body.extra = {}
-            self.response = self.client.post("assets/events", 
json=asset_event_body.model_dump(mode="json"))
+            self.response = self.client.post(
+                "assets/events", json=asset_event_body.model_dump(mode="json", 
exclude_none=True)
+            )
             return 
AssetEventResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -307,7 +348,9 @@ class BackfillOperations(BaseOperations):
     def create(self, backfill: BackfillPostBody) -> BackfillResponse | 
ServerResponseError:
         """Create a backfill."""
         try:
-            self.response = self.client.post("backfills", 
data=backfill.model_dump(mode="json"))
+            self.response = self.client.post(
+                "backfills", data=backfill.model_dump(mode="json", 
exclude_none=True)
+            )
             return BackfillResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -315,7 +358,9 @@ class BackfillOperations(BaseOperations):
     def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse | 
ServerResponseError:
         """Create a dry run backfill."""
         try:
-            self.response = self.client.post("backfills/dry_run", 
data=backfill.model_dump(mode="json"))
+            self.response = self.client.post(
+                "backfills/dry_run", data=backfill.model_dump(mode="json", 
exclude_none=True)
+            )
             return BackfillResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -399,7 +444,9 @@ class ConnectionsOperations(BaseOperations):
     ) -> ConnectionResponse | ServerResponseError:
         """Create a connection."""
         try:
-            self.response = self.client.post("connections", 
json=connection.model_dump(mode="json"))
+            self.response = self.client.post(
+                "connections", json=connection.model_dump(mode="json", 
exclude_none=True)
+            )
             return 
ConnectionResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -618,7 +665,7 @@ class PoolsOperations(BaseOperations):
     def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError:
         """Create a pool."""
         try:
-            self.response = self.client.post("pools", 
json=pool.model_dump(mode="json"))
+            self.response = self.client.post("pools", 
json=pool.model_dump(mode="json", exclude_none=True))
             return PoolResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -676,7 +723,9 @@ class VariablesOperations(BaseOperations):
     def create(self, variable: VariableBody) -> VariableResponse | 
ServerResponseError:
         """Create a variable."""
         try:
-            self.response = self.client.post("variables", 
json=variable.model_dump(mode="json"))
+            self.response = self.client.post(
+                "variables", json=variable.model_dump(mode="json", 
exclude_none=True)
+            )
             return VariableResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -786,7 +835,7 @@ class XComOperations(BaseOperations):
         try:
             self.response = self.client.post(
                 
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
-                json=body.model_dump(mode="json", exclude_unset=True),
+                json=body.model_dump(mode="json", exclude_unset=True, 
exclude_none=True),
             )
             return 
XComResponseNative.model_validate_json(self.response.content)
         except ServerResponseError as e:
@@ -814,7 +863,7 @@ class XComOperations(BaseOperations):
         try:
             self.response = self.client.patch(
                 
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
-                json=body.model_dump(mode="json", exclude_unset=True),
+                json=body.model_dump(mode="json", exclude_unset=True, 
exclude_none=True),
             )
             return 
XComResponseNative.model_validate_json(self.response.content)
         except ServerResponseError as e:

Reply via email to