This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d597ed99712 Add update_mask support for bulk PATCH APIs (#54597)
d597ed99712 is described below

commit d597ed997128ae9f26487ba3ca212a4318a36581
Author: Brunda10 <[email protected]>
AuthorDate: Tue Sep 30 20:48:28 2025 +0530

    Add update_mask support for bulk PATCH APIs (#54597)
    
    * Added update_mask handling inside the 'update' action of bulk endpoints 
(e.g., Pools, Connections, Task Instances, XComs).
    
    * Added update_mask handling inside the 'update' action of bulk endpoints 
(e.g., Pools, Connections, Task Instances, XComs).
    
    * update-mask: Removed hardcoded value as a example
    
    * Supported Common Patch Utility Func
    
    * xUpdate Mask with Query Parameter suggestion
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    
    * Merge Conflicts
    
    * commited datamodels suggestion
    
    Co-authored-by: Kalyan R <[email protected]>
    
    * commited datamodels suggestion
    
    Co-authored-by: Kalyan R <[email protected]>
    
    * Removed Print statement
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    
    * Removed Print statement
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    
    * Removed Print statement
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    
    * Removed Print statement
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    
    * Refactor update_orm_from_pydantic-pools to accept session and perform 
entity selection
    
    * Merge Conflicts
    
    * Update INTHEWILD.md (#55051)
    
    Added DrMartens to the list
    
    * Regenerate AirflowCTL datamodels
    
    * Update INTHEWILD.md
    
    * Merge Conflicts
    
    * HTIL Changes Reverted Asper suggestion
    
    * HTIL Changes Reverted Asper suggestion
    
    * Xcom and Hitl file changes reverted
    
    * Formatting extra spaces
    
    * Formatting of the description
    
    * Open-API spec updated
    
    * Open API Spec Updated
    
    * Added validation to prevent updates on restricted fields
    
    * Suggested Change - Extra Space Added
    
    * rebased(due to static checks failure)
    
    ---------
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    Co-authored-by: Kalyan R <[email protected]>
    Co-authored-by: sprasad97 <[email protected]>
---
 .../api_fastapi/core_api/datamodels/common.py      |   8 +
 .../core_api/openapi/v2-rest-api-generated.yaml    |  40 +++++
 .../api_fastapi/core_api/routes/public/pools.py    |  34 +---
 .../core_api/routes/public/variables.py            |  33 +---
 .../api_fastapi/core_api/services/public/common.py |  48 ++++++
 .../api_fastapi/core_api/services/public/pools.py  |  89 +++++++++--
 .../core_api/services/public/variables.py          |  60 ++++++-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  60 +++++++
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  16 ++
 .../core_api/routes/public/test_connections.py     |  48 ++++++
 .../core_api/routes/public/test_pools.py           | 177 +++++++++++++++++++--
 .../core_api/routes/public/test_variables.py       | 107 +++++++++++++
 .../src/airflowctl/api/datamodels/generated.py     |  28 ++++
 13 files changed, 653 insertions(+), 95 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
index 1fa8fda41a6..32eb2c232ae 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
@@ -77,6 +77,14 @@ class BulkUpdateAction(BulkBaseAction[T]):
 
     action: Literal[BulkAction.UPDATE] = Field(description="The action to be 
performed on the entities.")
     entities: list[T] = Field(..., description="A list of entities to be 
updated.")
+    update_mask: list[str] | None = Field(
+        default=None,
+        description=(
+            "A list of field names to update for each entity."
+            "Only these fields will be applied from the request body to the 
database model."
+            "Any extra fields provided will be ignored."
+        ),
+    )
     action_on_non_existence: BulkActionNotOnExistence = 
BulkActionNotOnExistence.FAIL
 
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 90cfc65b772..cebced02a32 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -9376,6 +9376,16 @@ components:
           type: array
           title: Entities
           description: A list of entities to be updated.
+        update_mask:
+          anyOf:
+          - items:
+              type: string
+            type: array
+          - type: 'null'
+          title: Update Mask
+          description: A list of field names to update for each entity.Only 
these
+            fields will be applied from the request body to the database 
model.Any
+            extra fields provided will be ignored.
         action_on_non_existence:
           $ref: '#/components/schemas/BulkActionNotOnExistence'
           default: fail
@@ -9398,6 +9408,16 @@ components:
           type: array
           title: Entities
           description: A list of entities to be updated.
+        update_mask:
+          anyOf:
+          - items:
+              type: string
+            type: array
+          - type: 'null'
+          title: Update Mask
+          description: A list of field names to update for each entity.Only 
these
+            fields will be applied from the request body to the database 
model.Any
+            extra fields provided will be ignored.
         action_on_non_existence:
           $ref: '#/components/schemas/BulkActionNotOnExistence'
           default: fail
@@ -9420,6 +9440,16 @@ components:
           type: array
           title: Entities
           description: A list of entities to be updated.
+        update_mask:
+          anyOf:
+          - items:
+              type: string
+            type: array
+          - type: 'null'
+          title: Update Mask
+          description: A list of field names to update for each entity.Only 
these
+            fields will be applied from the request body to the database 
model.Any
+            extra fields provided will be ignored.
         action_on_non_existence:
           $ref: '#/components/schemas/BulkActionNotOnExistence'
           default: fail
@@ -9442,6 +9472,16 @@ components:
           type: array
           title: Entities
           description: A list of entities to be updated.
+        update_mask:
+          anyOf:
+          - items:
+              type: string
+            type: array
+          - type: 'null'
+          title: Update Mask
+          description: A list of field names to update for each entity.Only 
these
+            fields will be applied from the request body to the database 
model.Any
+            extra fields provided will be ignored.
         action_on_non_existence:
           $ref: '#/components/schemas/BulkActionNotOnExistence'
           default: fail
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
index 6a7e2f646d9..0646b6af5cf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
@@ -19,8 +19,6 @@ from __future__ import annotations
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, Query, status
-from fastapi.exceptions import RequestValidationError
-from pydantic import ValidationError
 from sqlalchemy import delete, select
 
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -33,7 +31,6 @@ from airflow.api_fastapi.common.parameters import (
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.common import BulkBody, 
BulkResponse
 from airflow.api_fastapi.core_api.datamodels.pools import (
-    BasePool,
     PoolBody,
     PoolCollectionResponse,
     PoolPatchBody,
@@ -45,7 +42,7 @@ from airflow.api_fastapi.core_api.security import (
     requires_access_pool,
     requires_access_pool_bulk,
 )
-from airflow.api_fastapi.core_api.services.public.pools import BulkPoolService
+from airflow.api_fastapi.core_api.services.public.pools import 
BulkPoolService, update_orm_from_pydantic
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.models.pool import Pool
 
@@ -150,35 +147,8 @@ def patch_pool(
             status.HTTP_400_BAD_REQUEST,
             "Invalid body, pool name from request body doesn't match uri 
parameter",
         )
-    # Only slots and include_deferred can be modified in 'default_pool'
-    if pool_name == Pool.DEFAULT_POOL_NAME:
-        if update_mask and all(mask.strip() in {"slots", "include_deferred"} 
for mask in update_mask):
-            pass
-        else:
-            raise HTTPException(
-                status.HTTP_400_BAD_REQUEST,
-                "Only slots and included_deferred can be modified on Default 
Pool",
-            )
-    pool = session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
-    if not pool:
-        raise HTTPException(
-            status.HTTP_404_NOT_FOUND, detail=f"The Pool with name: 
`{pool_name}` was not found"
-        )
-
-    fields_to_update = patch_body.model_fields_set
-    if update_mask:
-        fields_to_update = fields_to_update.intersection(update_mask)
-        data = patch_body.model_dump(include=fields_to_update, by_alias=True)
-    else:
-        data = patch_body.model_dump(include=fields_to_update, by_alias=True)
-        try:
-            BasePool.model_validate(data)
-        except ValidationError as e:
-            raise RequestValidationError(errors=e.errors())
-
-    for key, value in data.items():
-        setattr(pool, key, value)
 
+    pool = update_orm_from_pydantic(pool_name, patch_body, update_mask, 
session)
     return pool
 
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
index 36fc5be44b1..d2b5a294acf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
@@ -19,8 +19,6 @@ from __future__ import annotations
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, Query, status
-from fastapi.exceptions import RequestValidationError
-from pydantic import ValidationError
 from sqlalchemy import select
 
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -43,7 +41,10 @@ from airflow.api_fastapi.core_api.security import (
     requires_access_variable,
     requires_access_variable_bulk,
 )
-from airflow.api_fastapi.core_api.services.public.variables import 
BulkVariableService
+from airflow.api_fastapi.core_api.services.public.variables import (
+    BulkVariableService,
+    update_orm_from_pydantic,
+)
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.models.variable import Variable
 
@@ -142,31 +143,7 @@ def patch_variable(
     update_mask: list[str] | None = Query(None),
 ) -> VariableResponse:
     """Update a variable by key."""
-    if patch_body.key != variable_key:
-        raise HTTPException(
-            status.HTTP_400_BAD_REQUEST, "Invalid body, key from request body 
doesn't match uri parameter"
-        )
-    non_update_fields = {"key"}
-    variable = 
session.scalar(select(Variable).filter_by(key=variable_key).limit(1))
-    if not variable:
-        raise HTTPException(
-            status.HTTP_404_NOT_FOUND, f"The Variable with key: 
`{variable_key}` was not found"
-        )
-
-    fields_to_update = patch_body.model_fields_set
-    if update_mask:
-        fields_to_update = fields_to_update.intersection(update_mask)
-    else:
-        try:
-            VariableBody(**patch_body.model_dump())
-        except ValidationError as e:
-            raise RequestValidationError(errors=e.errors())
-
-    data = patch_body.model_dump(include=fields_to_update - non_update_fields, 
by_alias=True)
-
-    for key, val in data.items():
-        setattr(variable, key, val)
-
+    variable = update_orm_from_pydantic(variable_key, patch_body, update_mask, 
session)
     return variable
 
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
index 94e1157d1c0..37d3e5e08df 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
@@ -20,6 +20,9 @@ from __future__ import annotations
 from abc import ABC, abstractmethod
 from typing import Generic
 
+from fastapi import HTTPException, status
+from pydantic import BaseModel
+from sqlalchemy.ext.declarative import DeclarativeMeta
 from sqlalchemy.orm import Session
 
 from airflow.api_fastapi.core_api.datamodels.common import (
@@ -72,3 +75,48 @@ class BulkService(Generic[T], ABC):
     def handle_bulk_delete(self, action: BulkDeleteAction[T], results: 
BulkActionResponse) -> None:
         """Bulk delete entities."""
         raise NotImplementedError
+
+    @staticmethod
+    def apply_patch_with_update_mask(
+        model: DeclarativeMeta,
+        patch_body: BaseModel,
+        update_mask: list[str] | None,
+        non_update_fields: set[str] | None = None,
+    ) -> DeclarativeMeta:
+        """
+        Apply a patch to the given model using the provided update mask.
+
+        :param model: The SQLAlchemy model instance to update.
+        :param patch_body: Pydantic model containing patch data.
+        :param update_mask: Optional list of fields to update.
+        :param non_update_fields: Fields that should not be updated.
+        :return: The updated SQLAlchemy model instance.
+        :raises HTTPException: If invalid fields are provided in update_mask.
+        """
+        # Always dump without aliases for internal validation
+        raw_data = patch_body.model_dump(by_alias=False)
+        fields_to_update = set(patch_body.model_fields_set)
+
+        non_update_fields = non_update_fields or set()
+
+        if update_mask:
+            restricted_in_mask = 
set(update_mask).intersection(non_update_fields)
+            if restricted_in_mask:
+                raise HTTPException(
+                    status_code=status.HTTP_400_BAD_REQUEST,
+                    detail=f"Update not allowed: the following fields are 
immutable and cannot be modified: {restricted_in_mask}",
+                )
+            fields_to_update = fields_to_update.intersection(update_mask)
+
+        if non_update_fields:
+            fields_to_update = fields_to_update - non_update_fields
+
+        validated_data = {key: raw_data[key] for key in fields_to_update if 
key in raw_data}
+
+        data = patch_body.model_dump(include=set(validated_data.keys()), 
by_alias=True)
+
+        # Update the model with the validated data
+        for key, value in data.items():
+            setattr(model, key, value)
+
+        return model
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
index 98e16cbdccc..cc2d6696b5c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
@@ -18,9 +18,11 @@
 from __future__ import annotations
 
 from fastapi import HTTPException, status
+from fastapi.exceptions import RequestValidationError
 from pydantic import ValidationError
 from sqlalchemy import select
 
+from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.core_api.datamodels.common import (
     BulkActionNotOnExistence,
     BulkActionOnExistence,
@@ -30,12 +32,79 @@ from airflow.api_fastapi.core_api.datamodels.common import (
     BulkUpdateAction,
 )
 from airflow.api_fastapi.core_api.datamodels.pools import (
+    BasePool,
     PoolBody,
+    PoolPatchBody,
 )
 from airflow.api_fastapi.core_api.services.public.common import BulkService
 from airflow.models.pool import Pool
 
 
+def update_orm_from_pydantic(
+    pool_name: str,
+    patch_body: PoolBody | PoolPatchBody,
+    update_mask: list[str] | None,
+    session: SessionDep,
+) -> Pool:
+    """
+    Update an existing pool.
+
+    :param pool_name: The name of the existing Pool to be updated.
+    :param patch_body: Pydantic model containing the fields to update.
+    :param update_mask: Specific fields to update. If None, all provided 
fields will be considered.
+    :param session: The database session dependency.
+    :return: The updated Pool instance.
+    :raises HTTPException: If attempting to update disallowed fields on 
``default_pool``.
+    """
+    # Special restriction: default pool only allows limited fields to be 
patched
+    pool = session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
+    if not pool:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND, detail=f"The Pool with name: 
`{pool_name}` was not found"
+        )
+    if pool_name == Pool.DEFAULT_POOL_NAME:
+        if update_mask and all(mask.strip() in {"slots", "include_deferred"} 
for mask in update_mask):
+            # Validate only slots/include_deferred
+            try:
+                patch_body_subset = patch_body.model_dump(
+                    include={"slots", "include_deferred"}, exclude_unset=True, 
by_alias=True
+                )
+                # Re-run validation with BasePool but only on allowed fields
+                PoolPatchBody.model_validate(patch_body_subset)
+            except ValidationError as e:
+                raise RequestValidationError(errors=e.errors())
+        else:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                "Only slots and included_deferred can be modified on Default 
Pool",
+            )
+    else:
+        fields_to_update = patch_body.model_fields_set
+        try:
+            # Dump with both input + output aliases handled
+            body_dict = patch_body.model_dump(
+                include=fields_to_update,
+                by_alias=True,  # ensures we get the API-facing alias keys
+            )
+
+            # Normalize keys for BasePool (expects "pool")
+            if "name" in body_dict and "pool" not in body_dict:
+                body_dict["pool"] = body_dict.pop("name")
+
+            BasePool.model_validate(body_dict)
+
+        except ValidationError as e:
+            raise RequestValidationError(errors=e.errors())
+
+    # Delegate patch application to the common utility
+    return BulkService.apply_patch_with_update_mask(
+        model=pool,
+        patch_body=patch_body,
+        update_mask=update_mask,
+        non_update_fields=None,
+    )
+
+
 class BulkPoolService(BulkService[PoolBody]):
     """Service for handling bulk operations on pools."""
 
@@ -90,7 +159,6 @@ class BulkPoolService(BulkService[PoolBody]):
         """Bulk Update pools."""
         to_update_pool_names = {pool.pool for pool in action.entities}
         _, matched_pool_names, not_found_pool_names = 
self.categorize_pools(to_update_pool_names)
-
         try:
             if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_pool_names:
                 raise HTTPException(
@@ -101,24 +169,13 @@ class BulkPoolService(BulkService[PoolBody]):
                 update_pool_names = matched_pool_names
             else:
                 update_pool_names = to_update_pool_names
-
             for pool in action.entities:
-                if pool.pool in update_pool_names:
-                    old_pool = 
self.session.scalar(select(Pool).filter(Pool.pool == pool.pool).limit(1))
-
-                    data = {
-                        key: val for key, val in 
pool.model_dump(by_alias=True).items() if val is not None
-                    }
-                    try:
-                        PoolBody(**data)
-
-                        for key, val in data.items():
-                            setattr(old_pool, key, val)
+                if pool.pool not in update_pool_names:
+                    continue
 
-                        results.success.append(str(pool.pool))
+                pool = update_orm_from_pydantic(pool.pool, pool, 
action.update_mask, self.session)
 
-                    except ValidationError as e:
-                        results.errors.append({"error": f"{e.errors()}"})
+                results.success.append(str(pool.pool))  # use request field, 
always consistent
 
         except HTTPException as e:
             results.errors.append({"error": f"{e.detail}", "status_code": 
e.status_code})
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
index 0208ea1a0a5..4c049ca6e78 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
@@ -18,9 +18,11 @@
 from __future__ import annotations
 
 from fastapi import HTTPException, status
+from fastapi.exceptions import RequestValidationError
 from pydantic import ValidationError
 from sqlalchemy import select
 
+from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.core_api.datamodels.common import (
     BulkActionNotOnExistence,
     BulkActionOnExistence,
@@ -36,6 +38,52 @@ from airflow.api_fastapi.core_api.services.public.common 
import BulkService
 from airflow.models.variable import Variable
 
 
+def update_orm_from_pydantic(
+    variable_key: str, patch_body: VariableBody, update_mask: list[str] | 
None, session: SessionDep
+) -> Variable:
+    """
+    Update an existing Variable.
+
+    :param variable_key: The name of the existing Variable_key to update.
+    :param patch_body: The patch request body containing fields to update.
+    :param update_mask: List of fields to update. If None, all provided fields 
will be updated.
+    :param session: The database session dependency.
+    :return: The updated Variable object.
+    :raises HTTPException: If attempting to update restricted fields (e.g., 
``key``).
+    """
+    # Key field is immutable → cannot be patched
+
+    if patch_body.key != variable_key:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST, "Invalid body, key from request body 
doesn't match uri parameter"
+        )
+    old_variable = 
session.scalar(select(Variable).filter_by(key=variable_key).limit(1))
+    if not old_variable:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND, f"The Variable with key: 
`{variable_key}` was not found"
+        )
+
+    try:
+        VariableBody(**patch_body.model_dump())
+    except ValidationError as e:
+        raise RequestValidationError(errors=e.errors())
+    non_update_fields = {"key"}
+
+    if patch_body.key != old_variable.key:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            "Invalid body, key from request body doesn't match uri parameter",
+        )
+
+    # Apply patch via utility
+    return BulkService.apply_patch_with_update_mask(
+        model=old_variable,
+        patch_body=patch_body,
+        update_mask=update_mask,
+        non_update_fields=non_update_fields,
+    )
+
+
 class BulkVariableService(BulkService[VariableBody]):
     """Service for handling bulk operations on variables."""
 
@@ -81,7 +129,6 @@ class BulkVariableService(BulkService[VariableBody]):
         """Bulk Update variables."""
         to_update_keys = {variable.key for variable in action.entities}
         matched_keys, not_found_keys = self.categorize_keys(to_update_keys)
-
         try:
             if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_keys:
                 raise HTTPException(
@@ -94,14 +141,11 @@ class BulkVariableService(BulkService[VariableBody]):
                 update_keys = to_update_keys
 
             for variable in action.entities:
-                if variable.key in update_keys:
-                    old_variable = 
self.session.scalar(select(Variable).filter_by(key=variable.key).limit(1))
-                    VariableBody(**variable.model_dump())
-                    data = variable.model_dump(exclude={"key"}, by_alias=True)
+                if variable.key not in update_keys:
+                    continue
+                variable = update_orm_from_pydantic(variable.key, variable, 
action.update_mask, self.session)
 
-                    for key, val in data.items():
-                        setattr(old_variable, key, val)
-                    results.success.append(variable.key)
+                results.success.append(variable.key)
 
         except HTTPException as e:
             results.errors.append({"error": f"{e.detail}", "status_code": 
e.status_code})
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 36273f8818a..c21385737a0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1063,6 +1063,21 @@ export const $BulkUpdateAction_BulkTaskInstanceBody_ = {
             title: 'Entities',
             description: 'A list of entities to be updated.'
         },
+        update_mask: {
+            anyOf: [
+                {
+                    items: {
+                        type: 'string'
+                    },
+                    type: 'array'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Update Mask',
+            description: 'A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.'
+        },
         action_on_non_existence: {
             '$ref': '#/components/schemas/BulkActionNotOnExistence',
             default: 'fail'
@@ -1090,6 +1105,21 @@ export const $BulkUpdateAction_ConnectionBody_ = {
             title: 'Entities',
             description: 'A list of entities to be updated.'
         },
+        update_mask: {
+            anyOf: [
+                {
+                    items: {
+                        type: 'string'
+                    },
+                    type: 'array'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Update Mask',
+            description: 'A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.'
+        },
         action_on_non_existence: {
             '$ref': '#/components/schemas/BulkActionNotOnExistence',
             default: 'fail'
@@ -1117,6 +1147,21 @@ export const $BulkUpdateAction_PoolBody_ = {
             title: 'Entities',
             description: 'A list of entities to be updated.'
         },
+        update_mask: {
+            anyOf: [
+                {
+                    items: {
+                        type: 'string'
+                    },
+                    type: 'array'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Update Mask',
+            description: 'A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.'
+        },
         action_on_non_existence: {
             '$ref': '#/components/schemas/BulkActionNotOnExistence',
             default: 'fail'
@@ -1144,6 +1189,21 @@ export const $BulkUpdateAction_VariableBody_ = {
             title: 'Entities',
             description: 'A list of entities to be updated.'
         },
+        update_mask: {
+            anyOf: [
+                {
+                    items: {
+                        type: 'string'
+                    },
+                    type: 'array'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Update Mask',
+            description: 'A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.'
+        },
         action_on_non_existence: {
             '$ref': '#/components/schemas/BulkActionNotOnExistence',
             default: 'fail'
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 58a082807a2..4d04524d320 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -341,6 +341,10 @@ export type BulkUpdateAction_BulkTaskInstanceBody_ = {
      * A list of entities to be updated.
      */
     entities: Array<BulkTaskInstanceBody>;
+    /**
+     * A list of field names to update for each entity.Only these fields will 
be applied from the request body to the database model.Any extra fields 
provided will be ignored.
+     */
+    update_mask?: Array<(string)> | null;
     action_on_non_existence?: BulkActionNotOnExistence;
 };
 
@@ -353,6 +357,10 @@ export type BulkUpdateAction_ConnectionBody_ = {
      * A list of entities to be updated.
      */
     entities: Array<ConnectionBody>;
+    /**
+     * A list of field names to update for each entity.Only these fields will 
be applied from the request body to the database model.Any extra fields 
provided will be ignored.
+     */
+    update_mask?: Array<(string)> | null;
     action_on_non_existence?: BulkActionNotOnExistence;
 };
 
@@ -365,6 +373,10 @@ export type BulkUpdateAction_PoolBody_ = {
      * A list of entities to be updated.
      */
     entities: Array<PoolBody>;
+    /**
+     * A list of field names to update for each entity.Only these fields will 
be applied from the request body to the database model.Any extra fields 
provided will be ignored.
+     */
+    update_mask?: Array<(string)> | null;
     action_on_non_existence?: BulkActionNotOnExistence;
 };
 
@@ -377,6 +389,10 @@ export type BulkUpdateAction_VariableBody_ = {
      * A list of entities to be updated.
      */
     entities: Array<VariableBody>;
+    /**
+     * A list of field names to update for each entity.Only these fields will 
be applied from the request body to the database model.Any extra fields 
provided will be ignored.
+     */
+    update_mask?: Array<(string)> | null;
     action_on_non_existence?: BulkActionNotOnExistence;
 };
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index b8f60a86762..76224542a2b 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -1138,6 +1138,26 @@ class TestBulkConnections(TestConnectionEndpoint):
                 },
                 id="test_update_with_fail",
             ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                    "description": "updated_description",
+                                }
+                            ],
+                            "update_mask": ["description"],
+                            "action_on_non_existence": "fail",
+                        }
+                    ]
+                },
+                {"update": {"success": [TEST_CONN_ID], "errors": []}},
+                id="test_connection_update_with_valid_update_mask",
+            ),
             pytest.param(
                 {
                     "actions": [
@@ -1243,6 +1263,34 @@ class TestBulkConnections(TestConnectionEndpoint):
                 },
                 id="test_create_update_delete",
             ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                    "description": "updated_description",
+                                }
+                            ],
+                            "update_mask": ["description"],
+                            "action_on_non_existence": "fail",
+                        },
+                        {
+                            "action": "delete",
+                            "entities": [TEST_CONN_ID],
+                            "action_on_non_existence": "fail",
+                        },
+                    ]
+                },
+                {
+                    "update": {"success": [TEST_CONN_ID], "errors": []},
+                    "delete": {"success": [TEST_CONN_ID], "errors": []},
+                },
+                id="test_connection_create_update_delete_with_update_mask",
+            ),
         ],
     )
     def test_bulk_connections(self, test_client, actions, expected_results, 
session):
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
index d4d16abfb8f..4c2ff3f9d77 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
@@ -283,7 +283,7 @@ class TestPatchPool(TestPoolsEndpoint):
             (
                 Pool.DEFAULT_POOL_NAME,
                 {"update_mask": ["slots"]},
-                {"slots": 150},
+                {"slots": 150, "name": Pool.DEFAULT_POOL_NAME, 
"include_deferred": True},
                 200,
                 {
                     "deferred_slots": 0,
@@ -584,7 +584,14 @@ class TestBulkPools(TestPoolsEndpoint):
                     "actions": [
                         {
                             "action": "update",
-                            "entities": [{"name": "pool2", "slots": 10, 
"description": "New Description"}],
+                            "entities": [
+                                {
+                                    "name": "pool2",
+                                    "slots": 10,
+                                    "description": "New Description",
+                                    "include_deferred": False,
+                                }
+                            ],
                             "action_on_non_existence": "fail",
                         }
                     ]
@@ -597,7 +604,14 @@ class TestBulkPools(TestPoolsEndpoint):
                     "actions": [
                         {
                             "action": "update",
-                            "entities": [{"name": "pool4", "slots": 20, 
"description": "New Description"}],
+                            "entities": [
+                                {
+                                    "name": "pool4",
+                                    "slots": 20,
+                                    "description": "New Description",
+                                    "include_deferred": False,
+                                }
+                            ],
                             "action_on_non_existence": "skip",
                         }
                     ]
@@ -610,7 +624,14 @@ class TestBulkPools(TestPoolsEndpoint):
                     "actions": [
                         {
                             "action": "update",
-                            "entities": [{"name": "pool4", "slots": 10, 
"description": "New Description"}],
+                            "entities": [
+                                {
+                                    "name": "pool4",
+                                    "slots": 10,
+                                    "description": "New Description",
+                                    "include_deferred": False,
+                                }
+                            ],
                             "action_on_non_existence": "fail",
                         }
                     ]
@@ -628,6 +649,29 @@ class TestBulkPools(TestPoolsEndpoint):
                 },
                 id="test_update_not_found",
             ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "name": "pool1",
+                                    "slots": 50,
+                                    "description": "Updated description",
+                                    "include_deferred": False,
+                                }
+                            ],
+                            "update_mask": ["slots", "description"],
+                            "action_on_non_existence": "fail",
+                        }
+                    ]
+                },
+                {
+                    "update": {"success": ["pool1"], "errors": []},
+                },
+                id="test_update_with_valid_update_mask",
+            ),
             pytest.param(
                 {"actions": [{"action": "delete", "entities": ["pool1"], 
"action_on_non_existence": "skip"}]},
                 {"delete": {"success": ["pool1"], "errors": []}},
@@ -663,7 +707,14 @@ class TestBulkPools(TestPoolsEndpoint):
                         },
                         {
                             "action": "update",
-                            "entities": [{"name": "pool1", "slots": 10, 
"description": "New Description"}],
+                            "entities": [
+                                {
+                                    "name": "pool1",
+                                    "slots": 10,
+                                    "description": "New Description",
+                                    "include_deferred": False,
+                                }
+                            ],
                             "action_on_non_existence": "fail",
                         },
                         {"action": "delete", "entities": ["pool2"], 
"action_on_non_existence": "skip"},
@@ -686,7 +737,14 @@ class TestBulkPools(TestPoolsEndpoint):
                         },
                         {
                             "action": "update",
-                            "entities": [{"name": "pool1", "slots": 100, 
"description": "New Description"}],
+                            "entities": [
+                                {
+                                    "name": "pool1",
+                                    "slots": 100,
+                                    "description": "New Description",
+                                    "include_deferred": False,
+                                }
+                            ],
                             "action_on_non_existence": "fail",
                         },
                         {"action": "delete", "entities": ["pool4"], 
"action_on_non_existence": "skip"},
@@ -717,7 +775,14 @@ class TestBulkPools(TestPoolsEndpoint):
                         },
                         {
                             "action": "update",
-                            "entities": [{"name": "pool5", "slots": 10, 
"description": "New Description"}],
+                            "entities": [
+                                {
+                                    "name": "pool5",
+                                    "slots": 10,
+                                    "description": "New Description",
+                                    "include_deferred": False,
+                                }
+                            ],
                             "action_on_non_existence": "skip",
                         },
                         {"action": "delete", "entities": ["pool5"], 
"action_on_non_existence": "skip"},
@@ -741,7 +806,12 @@ class TestBulkPools(TestPoolsEndpoint):
                         {
                             "action": "update",
                             "entities": [
-                                {"name": "pool5", "slots": 100, "description": 
"New test Description"}
+                                {
+                                    "name": "pool5",
+                                    "slots": 100,
+                                    "description": "New test Description",
+                                    "include_deferred": False,
+                                }
                             ],
                             "action_on_non_existence": "fail",
                         },
@@ -768,7 +838,12 @@ class TestBulkPools(TestPoolsEndpoint):
                         {
                             "action": "update",
                             "entities": [
-                                {"name": "pool1", "slots": 100, "description": 
"New test Description"}
+                                {
+                                    "name": "pool1",
+                                    "slots": 100,
+                                    "description": "New test Description",
+                                    "include_deferred": False,
+                                }
                             ],
                             "action_on_non_existence": "fail",
                         },
@@ -782,7 +857,12 @@ class TestBulkPools(TestPoolsEndpoint):
                         {
                             "action": "update",
                             "entities": [
-                                {"name": "pool8", "slots": 100, "description": 
"New test Description"}
+                                {
+                                    "name": "pool8",
+                                    "slots": 100,
+                                    "description": "New test Description",
+                                    "include_deferred": False,
+                                }
                             ],
                             "action_on_non_existence": "fail",
                         },
@@ -797,7 +877,12 @@ class TestBulkPools(TestPoolsEndpoint):
                         {
                             "action": "update",
                             "entities": [
-                                {"name": "pool9", "slots": 100, "description": 
"New test Description"}
+                                {
+                                    "name": "pool9",
+                                    "slots": 100,
+                                    "description": "New test Description",
+                                    "include_deferred": False,
+                                }
                             ],
                             "action_on_non_existence": "fail",
                         },
@@ -830,6 +915,41 @@ class TestBulkPools(TestPoolsEndpoint):
                 },
                 id="test_repeated_actions",
             ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "entities": [{"name": "pool6", "slots": 5, 
"description": "Initial Description"}],
+                            "action_on_existence": "fail",
+                        },
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "name": "pool6",
+                                    "slots": 50,
+                                    "description": "Masked Update Description",
+                                    "include_deferred": False,
+                                }
+                            ],
+                            "update_mask": ["slots"],
+                            "action_on_non_existence": "fail",
+                        },
+                        {
+                            "action": "delete",
+                            "entities": ["pool6"],
+                            "action_on_non_existence": "fail",
+                        },
+                    ]
+                },
+                {
+                    "create": {"success": ["pool6"], "errors": []},
+                    "update": {"success": ["pool6"], "errors": []},
+                    "delete": {"success": ["pool6"], "errors": []},
+                },
+                id="test_dependent_actions_with_update_mask",
+            ),
         ],
     )
     def test_bulk_pools(self, test_client, actions, expected_results, session):
@@ -840,6 +960,41 @@ class TestBulkPools(TestPoolsEndpoint):
             assert response_data[key] == value
         check_last_log(session, dag_id=None, event="bulk_pools", 
logical_date=None)
 
+    def test_update_mask_preserves_other_fields(self, test_client, session):
+        # Arrange: create a pool with initial values
+        self.create_pools()
+
+        # Act: update only the "slots" field via update_mask
+        response = test_client.patch(
+            "/pools",
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [
+                            {
+                                "name": "pool1",
+                                "slots": 50,
+                                "description": "Should not be updated",
+                                "include_deferred": False,
+                            }
+                        ],
+                        "update_mask": ["slots"],  # only slots should update
+                        "action_on_non_existence": "fail",
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data["update"]["success"] == ["pool1"]
+
+        # Assert: fetch from DB and check only masked field changed
+        updated_pool = session.query(Pool).filter_by(pool="pool1").one()
+        assert updated_pool.slots == 50  #  updated
+        assert updated_pool.description is None  #  unchanged
+        assert updated_pool.include_deferred is True  #  unchanged
+
     def test_should_respond_401(self, unauthenticated_test_client):
         response = unauthenticated_test_client.patch("/pools", json={})
         assert response.status_code == 401
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
index 076905ea03b..399c8a6976f 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
@@ -745,6 +745,81 @@ class TestBulkVariables(TestVariableEndpoint):
                 },
                 id="test_update_not_found",
             ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "key": "test_variable_key",
+                                    "value": "update_mask_value",
+                                    "description": "Updated variable 1",
+                                }
+                            ],
+                            "update_mask": ["value"],
+                            "action_on_non_existence": "fail",
+                        }
+                    ]
+                },
+                {"update": {"success": ["test_variable_key"], "errors": []}},
+                id="test_successful_update_mask",
+            ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "key": "test_variable_key",
+                                    "value": "new_value",
+                                    "description": "Updated description",
+                                }
+                            ],
+                            "update_mask": ["value"],
+                            "action_on_non_existence": "fail",
+                        }
+                    ]
+                },
+                {
+                    "update": {
+                        "success": ["test_variable_key"],
+                        "errors": [],
+                    }
+                },
+                id="test_variable_update_with_valid_update_mask",
+            ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "entities": [
+                                {
+                                    "key": "test_variable_key",
+                                    "value": "new_value",
+                                    "description": "Updated description",
+                                }
+                            ],
+                            "update_mask": ["key", "value"],
+                            "action_on_non_existence": "fail",
+                        }
+                    ]
+                },
+                {
+                    "update": {
+                        "success": [],
+                        "errors": [
+                            {
+                                "error": "Update not allowed: the following 
fields are immutable and cannot be modified: {'key'}",
+                                "status_code": 400,
+                            }
+                        ],
+                    }
+                },
+                id="test_bulk_update_should_fail_on_restricted_key",
+            ),
             pytest.param(
                 {
                     "actions": [
@@ -1064,6 +1139,38 @@ class TestBulkVariables(TestVariableEndpoint):
                 },
                 id="test_repeated_actions",
             ),
+            pytest.param(
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "entities": [
+                                {"key": "var1", "value": "initial", 
"description": "Initial Description"}
+                            ],
+                            "action_on_existence": "fail",
+                        },
+                        {
+                            "action": "update",
+                            "entities": [
+                                {"key": "var1", "value": "updated", 
"description": "Updated Description"}
+                            ],
+                            "update_mask": ["value"],
+                            "action_on_non_existence": "fail",
+                        },
+                        {
+                            "action": "delete",
+                            "entities": ["var1"],
+                            "action_on_non_existence": "fail",
+                        },
+                    ]
+                },
+                {
+                    "create": {"success": ["var1"], "errors": []},
+                    "update": {"success": ["var1"], "errors": []},
+                    "delete": {"success": ["var1"], "errors": []},
+                },
+                id="test_variable_dependent_actions_with_update_mask",
+            ),
         ],
     )
     def test_bulk_variables(self, test_client, actions, expected_results, 
session):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 6ea61b24cf8..b056e3c5912 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1164,6 +1164,13 @@ class BulkUpdateActionBulkTaskInstanceBody(BaseModel):
     entities: Annotated[
         list[BulkTaskInstanceBody], Field(description="A list of entities to 
be updated.", title="Entities")
     ]
+    update_mask: Annotated[
+        list[str] | None,
+        Field(
+            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
+            title="Update Mask",
+        ),
+    ] = None
     action_on_non_existence: BulkActionNotOnExistence | None = "fail"
 
 
@@ -1177,6 +1184,13 @@ class BulkUpdateActionConnectionBody(BaseModel):
     entities: Annotated[
         list[ConnectionBody], Field(description="A list of entities to be 
updated.", title="Entities")
     ]
+    update_mask: Annotated[
+        list[str] | None,
+        Field(
+            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
+            title="Update Mask",
+        ),
+    ] = None
     action_on_non_existence: BulkActionNotOnExistence | None = "fail"
 
 
@@ -1190,6 +1204,13 @@ class BulkUpdateActionPoolBody(BaseModel):
     entities: Annotated[
         list[PoolBody], Field(description="A list of entities to be updated.", 
title="Entities")
     ]
+    update_mask: Annotated[
+        list[str] | None,
+        Field(
+            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
+            title="Update Mask",
+        ),
+    ] = None
     action_on_non_existence: BulkActionNotOnExistence | None = "fail"
 
 
@@ -1203,6 +1224,13 @@ class BulkUpdateActionVariableBody(BaseModel):
     entities: Annotated[
         list[VariableBody], Field(description="A list of entities to be 
updated.", title="Entities")
     ]
+    update_mask: Annotated[
+        list[str] | None,
+        Field(
+            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
+            title="Update Mask",
+        ),
+    ] = None
     action_on_non_existence: BulkActionNotOnExistence | None = "fail"
 
 

Reply via email to