This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d597ed99712 Add update_mask support for bulk PATCH APIs (#54597)
d597ed99712 is described below
commit d597ed997128ae9f26487ba3ca212a4318a36581
Author: Brunda10 <[email protected]>
AuthorDate: Tue Sep 30 20:48:28 2025 +0530
Add update_mask support for bulk PATCH APIs (#54597)
* Added update_mask handling inside the 'update' action of bulk endpoints
(e.g., Pools, Connections, Task Instances, XComs).
* Added update_mask handling inside the 'update' action of bulk endpoints
(e.g., Pools, Connections, Task Instances, XComs).
* update-mask: Removed hardcoded value as a example
* Supported Common Patch Utility Func
* xUpdate Mask with Query Parameter suggestion
Co-authored-by: LIU ZHE YOU <[email protected]>
* Merge Conflicts
* commited datamodels suggestion
Co-authored-by: Kalyan R <[email protected]>
* commited datamodels suggestion
Co-authored-by: Kalyan R <[email protected]>
* Removed Print statement
Co-authored-by: LIU ZHE YOU <[email protected]>
* Removed Print statement
Co-authored-by: LIU ZHE YOU <[email protected]>
* Removed Print statement
Co-authored-by: LIU ZHE YOU <[email protected]>
* Removed Print statement
Co-authored-by: LIU ZHE YOU <[email protected]>
* Refactor update_orm_from_pydantic-pools to accept session and perform
entity selection
* Merge Conflicts
* Update INTHEWILD.md (#55051)
Added DrMartens to the list
* Regenerate AirflowCTL datamodels
* Update INTHEWILD.md
* Merge Conflicts
* HTIL Changes Reverted Asper suggestion
* HTIL Changes Reverted Asper suggestion
* Xcom and Hitl file changes reverted
* Formatting extra spaces
* Formatting of the description
* Open-API spec updated
* Open API Spec Updated
* Added validation to prevent updates on restricted fields
* Suggested Change - Extra Space Added
* rebased(due to static checks failure)
---------
Co-authored-by: LIU ZHE YOU <[email protected]>
Co-authored-by: Kalyan R <[email protected]>
Co-authored-by: sprasad97 <[email protected]>
---
.../api_fastapi/core_api/datamodels/common.py | 8 +
.../core_api/openapi/v2-rest-api-generated.yaml | 40 +++++
.../api_fastapi/core_api/routes/public/pools.py | 34 +---
.../core_api/routes/public/variables.py | 33 +---
.../api_fastapi/core_api/services/public/common.py | 48 ++++++
.../api_fastapi/core_api/services/public/pools.py | 89 +++++++++--
.../core_api/services/public/variables.py | 60 ++++++-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 60 +++++++
.../airflow/ui/openapi-gen/requests/types.gen.ts | 16 ++
.../core_api/routes/public/test_connections.py | 48 ++++++
.../core_api/routes/public/test_pools.py | 177 +++++++++++++++++++--
.../core_api/routes/public/test_variables.py | 107 +++++++++++++
.../src/airflowctl/api/datamodels/generated.py | 28 ++++
13 files changed, 653 insertions(+), 95 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
index 1fa8fda41a6..32eb2c232ae 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py
@@ -77,6 +77,14 @@ class BulkUpdateAction(BulkBaseAction[T]):
action: Literal[BulkAction.UPDATE] = Field(description="The action to be
performed on the entities.")
entities: list[T] = Field(..., description="A list of entities to be
updated.")
+ update_mask: list[str] | None = Field(
+ default=None,
+ description=(
+ "A list of field names to update for each entity."
+ "Only these fields will be applied from the request body to the
database model."
+ "Any extra fields provided will be ignored."
+ ),
+ )
action_on_non_existence: BulkActionNotOnExistence =
BulkActionNotOnExistence.FAIL
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 90cfc65b772..cebced02a32 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -9376,6 +9376,16 @@ components:
type: array
title: Entities
description: A list of entities to be updated.
+ update_mask:
+ anyOf:
+ - items:
+ type: string
+ type: array
+ - type: 'null'
+ title: Update Mask
+ description: A list of field names to update for each entity.Only
these
+ fields will be applied from the request body to the database
model.Any
+ extra fields provided will be ignored.
action_on_non_existence:
$ref: '#/components/schemas/BulkActionNotOnExistence'
default: fail
@@ -9398,6 +9408,16 @@ components:
type: array
title: Entities
description: A list of entities to be updated.
+ update_mask:
+ anyOf:
+ - items:
+ type: string
+ type: array
+ - type: 'null'
+ title: Update Mask
+ description: A list of field names to update for each entity.Only
these
+ fields will be applied from the request body to the database
model.Any
+ extra fields provided will be ignored.
action_on_non_existence:
$ref: '#/components/schemas/BulkActionNotOnExistence'
default: fail
@@ -9420,6 +9440,16 @@ components:
type: array
title: Entities
description: A list of entities to be updated.
+ update_mask:
+ anyOf:
+ - items:
+ type: string
+ type: array
+ - type: 'null'
+ title: Update Mask
+ description: A list of field names to update for each entity.Only
these
+ fields will be applied from the request body to the database
model.Any
+ extra fields provided will be ignored.
action_on_non_existence:
$ref: '#/components/schemas/BulkActionNotOnExistence'
default: fail
@@ -9442,6 +9472,16 @@ components:
type: array
title: Entities
description: A list of entities to be updated.
+ update_mask:
+ anyOf:
+ - items:
+ type: string
+ type: array
+ - type: 'null'
+ title: Update Mask
+ description: A list of field names to update for each entity.Only
these
+ fields will be applied from the request body to the database
model.Any
+ extra fields provided will be ignored.
action_on_non_existence:
$ref: '#/components/schemas/BulkActionNotOnExistence'
default: fail
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
index 6a7e2f646d9..0646b6af5cf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/pools.py
@@ -19,8 +19,6 @@ from __future__ import annotations
from typing import Annotated
from fastapi import Depends, HTTPException, Query, status
-from fastapi.exceptions import RequestValidationError
-from pydantic import ValidationError
from sqlalchemy import delete, select
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -33,7 +31,6 @@ from airflow.api_fastapi.common.parameters import (
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.common import BulkBody,
BulkResponse
from airflow.api_fastapi.core_api.datamodels.pools import (
- BasePool,
PoolBody,
PoolCollectionResponse,
PoolPatchBody,
@@ -45,7 +42,7 @@ from airflow.api_fastapi.core_api.security import (
requires_access_pool,
requires_access_pool_bulk,
)
-from airflow.api_fastapi.core_api.services.public.pools import BulkPoolService
+from airflow.api_fastapi.core_api.services.public.pools import
BulkPoolService, update_orm_from_pydantic
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.models.pool import Pool
@@ -150,35 +147,8 @@ def patch_pool(
status.HTTP_400_BAD_REQUEST,
"Invalid body, pool name from request body doesn't match uri
parameter",
)
- # Only slots and include_deferred can be modified in 'default_pool'
- if pool_name == Pool.DEFAULT_POOL_NAME:
- if update_mask and all(mask.strip() in {"slots", "include_deferred"}
for mask in update_mask):
- pass
- else:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST,
- "Only slots and included_deferred can be modified on Default
Pool",
- )
- pool = session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
- if not pool:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND, detail=f"The Pool with name:
`{pool_name}` was not found"
- )
-
- fields_to_update = patch_body.model_fields_set
- if update_mask:
- fields_to_update = fields_to_update.intersection(update_mask)
- data = patch_body.model_dump(include=fields_to_update, by_alias=True)
- else:
- data = patch_body.model_dump(include=fields_to_update, by_alias=True)
- try:
- BasePool.model_validate(data)
- except ValidationError as e:
- raise RequestValidationError(errors=e.errors())
-
- for key, value in data.items():
- setattr(pool, key, value)
+ pool = update_orm_from_pydantic(pool_name, patch_body, update_mask,
session)
return pool
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
index 36fc5be44b1..d2b5a294acf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/variables.py
@@ -19,8 +19,6 @@ from __future__ import annotations
from typing import Annotated
from fastapi import Depends, HTTPException, Query, status
-from fastapi.exceptions import RequestValidationError
-from pydantic import ValidationError
from sqlalchemy import select
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -43,7 +41,10 @@ from airflow.api_fastapi.core_api.security import (
requires_access_variable,
requires_access_variable_bulk,
)
-from airflow.api_fastapi.core_api.services.public.variables import
BulkVariableService
+from airflow.api_fastapi.core_api.services.public.variables import (
+ BulkVariableService,
+ update_orm_from_pydantic,
+)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.models.variable import Variable
@@ -142,31 +143,7 @@ def patch_variable(
update_mask: list[str] | None = Query(None),
) -> VariableResponse:
"""Update a variable by key."""
- if patch_body.key != variable_key:
- raise HTTPException(
- status.HTTP_400_BAD_REQUEST, "Invalid body, key from request body
doesn't match uri parameter"
- )
- non_update_fields = {"key"}
- variable =
session.scalar(select(Variable).filter_by(key=variable_key).limit(1))
- if not variable:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND, f"The Variable with key:
`{variable_key}` was not found"
- )
-
- fields_to_update = patch_body.model_fields_set
- if update_mask:
- fields_to_update = fields_to_update.intersection(update_mask)
- else:
- try:
- VariableBody(**patch_body.model_dump())
- except ValidationError as e:
- raise RequestValidationError(errors=e.errors())
-
- data = patch_body.model_dump(include=fields_to_update - non_update_fields,
by_alias=True)
-
- for key, val in data.items():
- setattr(variable, key, val)
-
+ variable = update_orm_from_pydantic(variable_key, patch_body, update_mask,
session)
return variable
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
index 94e1157d1c0..37d3e5e08df 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
@@ -20,6 +20,9 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Generic
+from fastapi import HTTPException, status
+from pydantic import BaseModel
+from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import Session
from airflow.api_fastapi.core_api.datamodels.common import (
@@ -72,3 +75,48 @@ class BulkService(Generic[T], ABC):
def handle_bulk_delete(self, action: BulkDeleteAction[T], results:
BulkActionResponse) -> None:
"""Bulk delete entities."""
raise NotImplementedError
+
+ @staticmethod
+ def apply_patch_with_update_mask(
+ model: DeclarativeMeta,
+ patch_body: BaseModel,
+ update_mask: list[str] | None,
+ non_update_fields: set[str] | None = None,
+ ) -> DeclarativeMeta:
+ """
+ Apply a patch to the given model using the provided update mask.
+
+ :param model: The SQLAlchemy model instance to update.
+ :param patch_body: Pydantic model containing patch data.
+ :param update_mask: Optional list of fields to update.
+ :param non_update_fields: Fields that should not be updated.
+ :return: The updated SQLAlchemy model instance.
+ :raises HTTPException: If invalid fields are provided in update_mask.
+ """
+ # Always dump without aliases for internal validation
+ raw_data = patch_body.model_dump(by_alias=False)
+ fields_to_update = set(patch_body.model_fields_set)
+
+ non_update_fields = non_update_fields or set()
+
+ if update_mask:
+ restricted_in_mask =
set(update_mask).intersection(non_update_fields)
+ if restricted_in_mask:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"Update not allowed: the following fields are
immutable and cannot be modified: {restricted_in_mask}",
+ )
+ fields_to_update = fields_to_update.intersection(update_mask)
+
+ if non_update_fields:
+ fields_to_update = fields_to_update - non_update_fields
+
+ validated_data = {key: raw_data[key] for key in fields_to_update if
key in raw_data}
+
+ data = patch_body.model_dump(include=set(validated_data.keys()),
by_alias=True)
+
+ # Update the model with the validated data
+ for key, value in data.items():
+ setattr(model, key, value)
+
+ return model
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
index 98e16cbdccc..cc2d6696b5c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
@@ -18,9 +18,11 @@
from __future__ import annotations
from fastapi import HTTPException, status
+from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select
+from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
BulkActionOnExistence,
@@ -30,12 +32,79 @@ from airflow.api_fastapi.core_api.datamodels.common import (
BulkUpdateAction,
)
from airflow.api_fastapi.core_api.datamodels.pools import (
+ BasePool,
PoolBody,
+ PoolPatchBody,
)
from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.models.pool import Pool
+def update_orm_from_pydantic(
+ pool_name: str,
+ patch_body: PoolBody | PoolPatchBody,
+ update_mask: list[str] | None,
+ session: SessionDep,
+) -> Pool:
+ """
+ Update an existing pool.
+
+ :param pool_name: The name of the existing Pool to be updated.
+ :param patch_body: Pydantic model containing the fields to update.
+ :param update_mask: Specific fields to update. If None, all provided
fields will be considered.
+ :param session: The database session dependency.
+ :return: The updated Pool instance.
+ :raises HTTPException: If attempting to update disallowed fields on
``default_pool``.
+ """
+ # Special restriction: default pool only allows limited fields to be
patched
+ pool = session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
+ if not pool:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, detail=f"The Pool with name:
`{pool_name}` was not found"
+ )
+ if pool_name == Pool.DEFAULT_POOL_NAME:
+ if update_mask and all(mask.strip() in {"slots", "include_deferred"}
for mask in update_mask):
+ # Validate only slots/include_deferred
+ try:
+ patch_body_subset = patch_body.model_dump(
+ include={"slots", "include_deferred"}, exclude_unset=True,
by_alias=True
+ )
+ # Re-run validation with BasePool but only on allowed fields
+ PoolPatchBody.model_validate(patch_body_subset)
+ except ValidationError as e:
+ raise RequestValidationError(errors=e.errors())
+ else:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ "Only slots and included_deferred can be modified on Default
Pool",
+ )
+ else:
+ fields_to_update = patch_body.model_fields_set
+ try:
+ # Dump with both input + output aliases handled
+ body_dict = patch_body.model_dump(
+ include=fields_to_update,
+ by_alias=True, # ensures we get the API-facing alias keys
+ )
+
+ # Normalize keys for BasePool (expects "pool")
+ if "name" in body_dict and "pool" not in body_dict:
+ body_dict["pool"] = body_dict.pop("name")
+
+ BasePool.model_validate(body_dict)
+
+ except ValidationError as e:
+ raise RequestValidationError(errors=e.errors())
+
+ # Delegate patch application to the common utility
+ return BulkService.apply_patch_with_update_mask(
+ model=pool,
+ patch_body=patch_body,
+ update_mask=update_mask,
+ non_update_fields=None,
+ )
+
+
class BulkPoolService(BulkService[PoolBody]):
"""Service for handling bulk operations on pools."""
@@ -90,7 +159,6 @@ class BulkPoolService(BulkService[PoolBody]):
"""Bulk Update pools."""
to_update_pool_names = {pool.pool for pool in action.entities}
_, matched_pool_names, not_found_pool_names =
self.categorize_pools(to_update_pool_names)
-
try:
if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_pool_names:
raise HTTPException(
@@ -101,24 +169,13 @@ class BulkPoolService(BulkService[PoolBody]):
update_pool_names = matched_pool_names
else:
update_pool_names = to_update_pool_names
-
for pool in action.entities:
- if pool.pool in update_pool_names:
- old_pool =
self.session.scalar(select(Pool).filter(Pool.pool == pool.pool).limit(1))
-
- data = {
- key: val for key, val in
pool.model_dump(by_alias=True).items() if val is not None
- }
- try:
- PoolBody(**data)
-
- for key, val in data.items():
- setattr(old_pool, key, val)
+ if pool.pool not in update_pool_names:
+ continue
- results.success.append(str(pool.pool))
+ pool = update_orm_from_pydantic(pool.pool, pool,
action.update_mask, self.session)
- except ValidationError as e:
- results.errors.append({"error": f"{e.errors()}"})
+ results.success.append(str(pool.pool)) # use request field,
always consistent
except HTTPException as e:
results.errors.append({"error": f"{e.detail}", "status_code":
e.status_code})
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
index 0208ea1a0a5..4c049ca6e78 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
@@ -18,9 +18,11 @@
from __future__ import annotations
from fastapi import HTTPException, status
+from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select
+from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
BulkActionOnExistence,
@@ -36,6 +38,52 @@ from airflow.api_fastapi.core_api.services.public.common
import BulkService
from airflow.models.variable import Variable
+def update_orm_from_pydantic(
+ variable_key: str, patch_body: VariableBody, update_mask: list[str] |
None, session: SessionDep
+) -> Variable:
+ """
+ Update an existing Variable.
+
+ :param variable_key: The name of the existing Variable_key to update.
+ :param patch_body: The patch request body containing fields to update.
+ :param update_mask: List of fields to update. If None, all provided fields
will be updated.
+ :param session: The database session dependency.
+ :return: The updated Variable object.
+ :raises HTTPException: If attempting to update restricted fields (e.g.,
``key``).
+ """
+ # Key field is immutable → cannot be patched
+
+ if patch_body.key != variable_key:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "Invalid body, key from request body
doesn't match uri parameter"
+ )
+ old_variable =
session.scalar(select(Variable).filter_by(key=variable_key).limit(1))
+ if not old_variable:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"The Variable with key:
`{variable_key}` was not found"
+ )
+
+ try:
+ VariableBody(**patch_body.model_dump())
+ except ValidationError as e:
+ raise RequestValidationError(errors=e.errors())
+ non_update_fields = {"key"}
+
+ if patch_body.key != old_variable.key:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ "Invalid body, key from request body doesn't match uri parameter",
+ )
+
+ # Apply patch via utility
+ return BulkService.apply_patch_with_update_mask(
+ model=old_variable,
+ patch_body=patch_body,
+ update_mask=update_mask,
+ non_update_fields=non_update_fields,
+ )
+
+
class BulkVariableService(BulkService[VariableBody]):
"""Service for handling bulk operations on variables."""
@@ -81,7 +129,6 @@ class BulkVariableService(BulkService[VariableBody]):
"""Bulk Update variables."""
to_update_keys = {variable.key for variable in action.entities}
matched_keys, not_found_keys = self.categorize_keys(to_update_keys)
-
try:
if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_keys:
raise HTTPException(
@@ -94,14 +141,11 @@ class BulkVariableService(BulkService[VariableBody]):
update_keys = to_update_keys
for variable in action.entities:
- if variable.key in update_keys:
- old_variable =
self.session.scalar(select(Variable).filter_by(key=variable.key).limit(1))
- VariableBody(**variable.model_dump())
- data = variable.model_dump(exclude={"key"}, by_alias=True)
+ if variable.key not in update_keys:
+ continue
+ variable = update_orm_from_pydantic(variable.key, variable,
action.update_mask, self.session)
- for key, val in data.items():
- setattr(old_variable, key, val)
- results.success.append(variable.key)
+ results.success.append(variable.key)
except HTTPException as e:
results.errors.append({"error": f"{e.detail}", "status_code":
e.status_code})
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 36273f8818a..c21385737a0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1063,6 +1063,21 @@ export const $BulkUpdateAction_BulkTaskInstanceBody_ = {
title: 'Entities',
description: 'A list of entities to be updated.'
},
+ update_mask: {
+ anyOf: [
+ {
+ items: {
+ type: 'string'
+ },
+ type: 'array'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Update Mask',
+ description: 'A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.'
+ },
action_on_non_existence: {
'$ref': '#/components/schemas/BulkActionNotOnExistence',
default: 'fail'
@@ -1090,6 +1105,21 @@ export const $BulkUpdateAction_ConnectionBody_ = {
title: 'Entities',
description: 'A list of entities to be updated.'
},
+ update_mask: {
+ anyOf: [
+ {
+ items: {
+ type: 'string'
+ },
+ type: 'array'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Update Mask',
+ description: 'A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.'
+ },
action_on_non_existence: {
'$ref': '#/components/schemas/BulkActionNotOnExistence',
default: 'fail'
@@ -1117,6 +1147,21 @@ export const $BulkUpdateAction_PoolBody_ = {
title: 'Entities',
description: 'A list of entities to be updated.'
},
+ update_mask: {
+ anyOf: [
+ {
+ items: {
+ type: 'string'
+ },
+ type: 'array'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Update Mask',
+ description: 'A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.'
+ },
action_on_non_existence: {
'$ref': '#/components/schemas/BulkActionNotOnExistence',
default: 'fail'
@@ -1144,6 +1189,21 @@ export const $BulkUpdateAction_VariableBody_ = {
title: 'Entities',
description: 'A list of entities to be updated.'
},
+ update_mask: {
+ anyOf: [
+ {
+ items: {
+ type: 'string'
+ },
+ type: 'array'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Update Mask',
+ description: 'A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.'
+ },
action_on_non_existence: {
'$ref': '#/components/schemas/BulkActionNotOnExistence',
default: 'fail'
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 58a082807a2..4d04524d320 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -341,6 +341,10 @@ export type BulkUpdateAction_BulkTaskInstanceBody_ = {
* A list of entities to be updated.
*/
entities: Array<BulkTaskInstanceBody>;
+ /**
+ * A list of field names to update for each entity.Only these fields will
be applied from the request body to the database model.Any extra fields
provided will be ignored.
+ */
+ update_mask?: Array<(string)> | null;
action_on_non_existence?: BulkActionNotOnExistence;
};
@@ -353,6 +357,10 @@ export type BulkUpdateAction_ConnectionBody_ = {
* A list of entities to be updated.
*/
entities: Array<ConnectionBody>;
+ /**
+ * A list of field names to update for each entity.Only these fields will
be applied from the request body to the database model.Any extra fields
provided will be ignored.
+ */
+ update_mask?: Array<(string)> | null;
action_on_non_existence?: BulkActionNotOnExistence;
};
@@ -365,6 +373,10 @@ export type BulkUpdateAction_PoolBody_ = {
* A list of entities to be updated.
*/
entities: Array<PoolBody>;
+ /**
+ * A list of field names to update for each entity.Only these fields will
be applied from the request body to the database model.Any extra fields
provided will be ignored.
+ */
+ update_mask?: Array<(string)> | null;
action_on_non_existence?: BulkActionNotOnExistence;
};
@@ -377,6 +389,10 @@ export type BulkUpdateAction_VariableBody_ = {
* A list of entities to be updated.
*/
entities: Array<VariableBody>;
+ /**
+ * A list of field names to update for each entity.Only these fields will
be applied from the request body to the database model.Any extra fields
provided will be ignored.
+ */
+ update_mask?: Array<(string)> | null;
action_on_non_existence?: BulkActionNotOnExistence;
};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index b8f60a86762..76224542a2b 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -1138,6 +1138,26 @@ class TestBulkConnections(TestConnectionEndpoint):
},
id="test_update_with_fail",
),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "connection_id": TEST_CONN_ID,
+ "conn_type": TEST_CONN_TYPE,
+ "description": "updated_description",
+ }
+ ],
+ "update_mask": ["description"],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ },
+ {"update": {"success": [TEST_CONN_ID], "errors": []}},
+ id="test_connection_update_with_valid_update_mask",
+ ),
pytest.param(
{
"actions": [
@@ -1243,6 +1263,34 @@ class TestBulkConnections(TestConnectionEndpoint):
},
id="test_create_update_delete",
),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "connection_id": TEST_CONN_ID,
+ "conn_type": TEST_CONN_TYPE,
+ "description": "updated_description",
+ }
+ ],
+ "update_mask": ["description"],
+ "action_on_non_existence": "fail",
+ },
+ {
+ "action": "delete",
+ "entities": [TEST_CONN_ID],
+ "action_on_non_existence": "fail",
+ },
+ ]
+ },
+ {
+ "update": {"success": [TEST_CONN_ID], "errors": []},
+ "delete": {"success": [TEST_CONN_ID], "errors": []},
+ },
+ id="test_connection_create_update_delete_with_update_mask",
+ ),
],
)
def test_bulk_connections(self, test_client, actions, expected_results,
session):
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
index d4d16abfb8f..4c2ff3f9d77 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
@@ -283,7 +283,7 @@ class TestPatchPool(TestPoolsEndpoint):
(
Pool.DEFAULT_POOL_NAME,
{"update_mask": ["slots"]},
- {"slots": 150},
+ {"slots": 150, "name": Pool.DEFAULT_POOL_NAME,
"include_deferred": True},
200,
{
"deferred_slots": 0,
@@ -584,7 +584,14 @@ class TestBulkPools(TestPoolsEndpoint):
"actions": [
{
"action": "update",
- "entities": [{"name": "pool2", "slots": 10,
"description": "New Description"}],
+ "entities": [
+ {
+ "name": "pool2",
+ "slots": 10,
+ "description": "New Description",
+ "include_deferred": False,
+ }
+ ],
"action_on_non_existence": "fail",
}
]
@@ -597,7 +604,14 @@ class TestBulkPools(TestPoolsEndpoint):
"actions": [
{
"action": "update",
- "entities": [{"name": "pool4", "slots": 20,
"description": "New Description"}],
+ "entities": [
+ {
+ "name": "pool4",
+ "slots": 20,
+ "description": "New Description",
+ "include_deferred": False,
+ }
+ ],
"action_on_non_existence": "skip",
}
]
@@ -610,7 +624,14 @@ class TestBulkPools(TestPoolsEndpoint):
"actions": [
{
"action": "update",
- "entities": [{"name": "pool4", "slots": 10,
"description": "New Description"}],
+ "entities": [
+ {
+ "name": "pool4",
+ "slots": 10,
+ "description": "New Description",
+ "include_deferred": False,
+ }
+ ],
"action_on_non_existence": "fail",
}
]
@@ -628,6 +649,29 @@ class TestBulkPools(TestPoolsEndpoint):
},
id="test_update_not_found",
),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "name": "pool1",
+ "slots": 50,
+ "description": "Updated description",
+ "include_deferred": False,
+ }
+ ],
+ "update_mask": ["slots", "description"],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ },
+ {
+ "update": {"success": ["pool1"], "errors": []},
+ },
+ id="test_update_with_valid_update_mask",
+ ),
pytest.param(
{"actions": [{"action": "delete", "entities": ["pool1"],
"action_on_non_existence": "skip"}]},
{"delete": {"success": ["pool1"], "errors": []}},
@@ -663,7 +707,14 @@ class TestBulkPools(TestPoolsEndpoint):
},
{
"action": "update",
- "entities": [{"name": "pool1", "slots": 10,
"description": "New Description"}],
+ "entities": [
+ {
+ "name": "pool1",
+ "slots": 10,
+ "description": "New Description",
+ "include_deferred": False,
+ }
+ ],
"action_on_non_existence": "fail",
},
{"action": "delete", "entities": ["pool2"],
"action_on_non_existence": "skip"},
@@ -686,7 +737,14 @@ class TestBulkPools(TestPoolsEndpoint):
},
{
"action": "update",
- "entities": [{"name": "pool1", "slots": 100,
"description": "New Description"}],
+ "entities": [
+ {
+ "name": "pool1",
+ "slots": 100,
+ "description": "New Description",
+ "include_deferred": False,
+ }
+ ],
"action_on_non_existence": "fail",
},
{"action": "delete", "entities": ["pool4"],
"action_on_non_existence": "skip"},
@@ -717,7 +775,14 @@ class TestBulkPools(TestPoolsEndpoint):
},
{
"action": "update",
- "entities": [{"name": "pool5", "slots": 10,
"description": "New Description"}],
+ "entities": [
+ {
+ "name": "pool5",
+ "slots": 10,
+ "description": "New Description",
+ "include_deferred": False,
+ }
+ ],
"action_on_non_existence": "skip",
},
{"action": "delete", "entities": ["pool5"],
"action_on_non_existence": "skip"},
@@ -741,7 +806,12 @@ class TestBulkPools(TestPoolsEndpoint):
{
"action": "update",
"entities": [
- {"name": "pool5", "slots": 100, "description":
"New test Description"}
+ {
+ "name": "pool5",
+ "slots": 100,
+ "description": "New test Description",
+ "include_deferred": False,
+ }
],
"action_on_non_existence": "fail",
},
@@ -768,7 +838,12 @@ class TestBulkPools(TestPoolsEndpoint):
{
"action": "update",
"entities": [
- {"name": "pool1", "slots": 100, "description":
"New test Description"}
+ {
+ "name": "pool1",
+ "slots": 100,
+ "description": "New test Description",
+ "include_deferred": False,
+ }
],
"action_on_non_existence": "fail",
},
@@ -782,7 +857,12 @@ class TestBulkPools(TestPoolsEndpoint):
{
"action": "update",
"entities": [
- {"name": "pool8", "slots": 100, "description":
"New test Description"}
+ {
+ "name": "pool8",
+ "slots": 100,
+ "description": "New test Description",
+ "include_deferred": False,
+ }
],
"action_on_non_existence": "fail",
},
@@ -797,7 +877,12 @@ class TestBulkPools(TestPoolsEndpoint):
{
"action": "update",
"entities": [
- {"name": "pool9", "slots": 100, "description":
"New test Description"}
+ {
+ "name": "pool9",
+ "slots": 100,
+ "description": "New test Description",
+ "include_deferred": False,
+ }
],
"action_on_non_existence": "fail",
},
@@ -830,6 +915,41 @@ class TestBulkPools(TestPoolsEndpoint):
},
id="test_repeated_actions",
),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "create",
+ "entities": [{"name": "pool6", "slots": 5,
"description": "Initial Description"}],
+ "action_on_existence": "fail",
+ },
+ {
+ "action": "update",
+ "entities": [
+ {
+ "name": "pool6",
+ "slots": 50,
+ "description": "Masked Update Description",
+ "include_deferred": False,
+ }
+ ],
+ "update_mask": ["slots"],
+ "action_on_non_existence": "fail",
+ },
+ {
+ "action": "delete",
+ "entities": ["pool6"],
+ "action_on_non_existence": "fail",
+ },
+ ]
+ },
+ {
+ "create": {"success": ["pool6"], "errors": []},
+ "update": {"success": ["pool6"], "errors": []},
+ "delete": {"success": ["pool6"], "errors": []},
+ },
+ id="test_dependent_actions_with_update_mask",
+ ),
],
)
def test_bulk_pools(self, test_client, actions, expected_results, session):
@@ -840,6 +960,41 @@ class TestBulkPools(TestPoolsEndpoint):
assert response_data[key] == value
check_last_log(session, dag_id=None, event="bulk_pools",
logical_date=None)
+ def test_update_mask_preserves_other_fields(self, test_client, session):
+ # Arrange: create a pool with initial values
+ self.create_pools()
+
+ # Act: update only the "slots" field via update_mask
+ response = test_client.patch(
+ "/pools",
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "name": "pool1",
+ "slots": 50,
+ "description": "Should not be updated",
+ "include_deferred": False,
+ }
+ ],
+ "update_mask": ["slots"], # only slots should update
+ "action_on_non_existence": "fail",
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data["update"]["success"] == ["pool1"]
+
+ # Assert: fetch from DB and check only masked field changed
+ updated_pool = session.query(Pool).filter_by(pool="pool1").one()
+ assert updated_pool.slots == 50 # updated
+ assert updated_pool.description is None # unchanged
+ assert updated_pool.include_deferred is True # unchanged
+
def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.patch("/pools", json={})
assert response.status_code == 401
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
index 076905ea03b..399c8a6976f 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
@@ -745,6 +745,81 @@ class TestBulkVariables(TestVariableEndpoint):
},
id="test_update_not_found",
),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "key": "test_variable_key",
+ "value": "update_mask_value",
+ "description": "Updated variable 1",
+ }
+ ],
+ "update_mask": ["value"],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ },
+ {"update": {"success": ["test_variable_key"], "errors": []}},
+ id="test_successful_update_mask",
+ ),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "key": "test_variable_key",
+ "value": "new_value",
+ "description": "Updated description",
+ }
+ ],
+ "update_mask": ["value"],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ },
+ {
+ "update": {
+ "success": ["test_variable_key"],
+ "errors": [],
+ }
+ },
+ id="test_variable_update_with_valid_update_mask",
+ ),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "key": "test_variable_key",
+ "value": "new_value",
+ "description": "Updated description",
+ }
+ ],
+ "update_mask": ["key", "value"],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ },
+ {
+ "update": {
+ "success": [],
+ "errors": [
+ {
+ "error": "Update not allowed: the following
fields are immutable and cannot be modified: {'key'}",
+ "status_code": 400,
+ }
+ ],
+ }
+ },
+ id="test_bulk_update_should_fail_on_restricted_key",
+ ),
pytest.param(
{
"actions": [
@@ -1064,6 +1139,38 @@ class TestBulkVariables(TestVariableEndpoint):
},
id="test_repeated_actions",
),
+ pytest.param(
+ {
+ "actions": [
+ {
+ "action": "create",
+ "entities": [
+ {"key": "var1", "value": "initial",
"description": "Initial Description"}
+ ],
+ "action_on_existence": "fail",
+ },
+ {
+ "action": "update",
+ "entities": [
+ {"key": "var1", "value": "updated",
"description": "Updated Description"}
+ ],
+ "update_mask": ["value"],
+ "action_on_non_existence": "fail",
+ },
+ {
+ "action": "delete",
+ "entities": ["var1"],
+ "action_on_non_existence": "fail",
+ },
+ ]
+ },
+ {
+ "create": {"success": ["var1"], "errors": []},
+ "update": {"success": ["var1"], "errors": []},
+ "delete": {"success": ["var1"], "errors": []},
+ },
+ id="test_variable_dependent_actions_with_update_mask",
+ ),
],
)
def test_bulk_variables(self, test_client, actions, expected_results,
session):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 6ea61b24cf8..b056e3c5912 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1164,6 +1164,13 @@ class BulkUpdateActionBulkTaskInstanceBody(BaseModel):
entities: Annotated[
list[BulkTaskInstanceBody], Field(description="A list of entities to
be updated.", title="Entities")
]
+ update_mask: Annotated[
+ list[str] | None,
+ Field(
+ description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
+ title="Update Mask",
+ ),
+ ] = None
action_on_non_existence: BulkActionNotOnExistence | None = "fail"
@@ -1177,6 +1184,13 @@ class BulkUpdateActionConnectionBody(BaseModel):
entities: Annotated[
list[ConnectionBody], Field(description="A list of entities to be
updated.", title="Entities")
]
+ update_mask: Annotated[
+ list[str] | None,
+ Field(
+ description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
+ title="Update Mask",
+ ),
+ ] = None
action_on_non_existence: BulkActionNotOnExistence | None = "fail"
@@ -1190,6 +1204,13 @@ class BulkUpdateActionPoolBody(BaseModel):
entities: Annotated[
list[PoolBody], Field(description="A list of entities to be updated.",
title="Entities")
]
+ update_mask: Annotated[
+ list[str] | None,
+ Field(
+ description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
+ title="Update Mask",
+ ),
+ ] = None
action_on_non_existence: BulkActionNotOnExistence | None = "fail"
@@ -1203,6 +1224,13 @@ class BulkUpdateActionVariableBody(BaseModel):
entities: Annotated[
list[VariableBody], Field(description="A list of entities to be
updated.", title="Entities")
]
+ update_mask: Annotated[
+ list[str] | None,
+ Field(
+ description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
+ title="Update Mask",
+ ),
+ ] = None
action_on_non_existence: BulkActionNotOnExistence | None = "fail"