This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bdf6c91e583 adds _ensure_identity, modifies files, and tests (#58563)
bdf6c91e583 is described below
commit bdf6c91e583a797efca41da7ffac13c685a8e88a
Author: Henry Chen <[email protected]>
AuthorDate: Mon Nov 24 19:20:22 2025 +0800
adds _ensure_identity, modifies files, and tests (#58563)
---
.../azure/operators/container_instances.py | 79 ++++++++++++++++++++--
.../azure/operators/test_container_instances.py | 35 ++++++++++
2 files changed, 107 insertions(+), 7 deletions(-)
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
index 05ec32e0320..0be1ccc484d 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -21,7 +21,7 @@ import re
import time
from collections import namedtuple
from collections.abc import Sequence
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
from azure.mgmt.containerinstance.models import (
Container,
@@ -33,8 +33,10 @@ from azure.mgmt.containerinstance.models import (
DnsConfiguration,
EnvironmentVariable,
IpAddress,
+ ResourceIdentityType,
ResourceRequests,
ResourceRequirements,
+ UserAssignedIdentities,
Volume as _AzureVolume,
VolumeMount,
)
@@ -147,10 +149,13 @@ class AzureContainerInstancesOperator(BaseOperator):
},
priority="Regular",
identity = {
- {
- "type": "UserAssigned",
- "resource_ids":
["/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/my_rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my_identity"],
- },
+ "type": "UserAssigned" | "SystemAssigned" |
"SystemAssigned,UserAssigned",
+ "resource_ids": [
+
"/subscriptions/<sub>/resourceGroups/<rg>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<id>"
+ ]
+ "user_assigned_identities": {
+ "/subscriptions/.../userAssignedIdentities/<id>": {}
+ }
}
command=["/bin/echo", "world"],
task_id="start_container",
@@ -188,7 +193,7 @@ class AzureContainerInstancesOperator(BaseOperator):
dns_config: DnsConfiguration | None = None,
diagnostics: ContainerGroupDiagnostics | None = None,
priority: str | None = "Regular",
- identity: ContainerGroupIdentity | None = None,
+ identity: ContainerGroupIdentity | dict | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -231,7 +236,7 @@ class AzureContainerInstancesOperator(BaseOperator):
self.dns_config = dns_config
self.diagnostics = diagnostics
self.priority = priority
- self.identity = identity
+ self.identity = self._ensure_identity(identity)
if self.priority not in ["Regular", "Spot"]:
raise AirflowException(
"Invalid value for the priority argument. "
@@ -239,6 +244,66 @@ class AzureContainerInstancesOperator(BaseOperator):
f"Found `{self.priority}`."
)
+ # helper to accept dict (user-friendly) or ContainerGroupIdentity (SDK
object)
+ @staticmethod
+ def _ensure_identity(identity: ContainerGroupIdentity | dict | None) ->
ContainerGroupIdentity | None:
+ """
+ Normalize identity input into a ContainerGroupIdentity instance.
+
+ Accepts:
+ - None -> returns None
+ - ContainerGroupIdentity -> returned as-is
+ - dict -> converted to ContainerGroupIdentity
+ - any other object -> returned as-is (pass-through) to preserve
backwards compatibility
+
+ Expected dict shapes:
+ {"type": "UserAssigned", "resource_ids":
["/.../userAssignedIdentities/id1", ...]}
+ or
+ {"type": "SystemAssigned"}
+ or
+ {"type": "SystemAssigned,UserAssigned", "resource_ids": [...]}
+ """
+ if identity is None:
+ return None
+
+ if isinstance(identity, ContainerGroupIdentity):
+ return identity
+
+ if isinstance(identity, dict):
+ # require type
+ id_type = identity.get("type")
+ if not id_type:
+ raise AirflowException(
+ "identity dict must include 'type' key with value
'UserAssigned' or 'SystemAssigned'"
+ )
+
+ # map common string type names to ResourceIdentityType enum values
if available
+ type_map = {
+ "SystemAssigned": ResourceIdentityType.system_assigned,
+ "UserAssigned": ResourceIdentityType.user_assigned,
+ "SystemAssigned,UserAssigned":
ResourceIdentityType.system_assigned_user_assigned,
+ "SystemAssigned, UserAssigned":
ResourceIdentityType.system_assigned_user_assigned,
+ }
+ cg_type = type_map.get(id_type, id_type)
+
+ # build user_assigned_identities mapping if resource_ids provided
+ resource_ids = identity.get("resource_ids")
+ if resource_ids:
+ if not isinstance(resource_ids, (list, tuple)):
+ raise AirflowException("identity['resource_ids'] must be a
list of resource id strings")
+ user_assigned_identities: dict[str, Any] = {rid: {} for rid in
resource_ids}
+ else:
+ # accept a pre-built mapping if given
+ user_assigned_identities =
identity.get("user_assigned_identities") or {}
+
+ return ContainerGroupIdentity(
+ type=cg_type,
+ user_assigned_identities=cast(
+ "dict[str, UserAssignedIdentities] | None",
user_assigned_identities
+ ),
+ )
+ return identity
+
def execute(self, context: Context) -> int:
# Check name again in case it was templated.
self._check_name(self.name)
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
index 39756cab095..b8653fc6d62 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
@@ -611,6 +611,41 @@ class TestACIOperator:
assert called_cg.identity == identity
+
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
+ def test_execute_with_identity_dict(self, aci_mock):
+ # New test: pass a dict and verify operator converts it to
ContainerGroupIdentity
+ resource_id =
"/subscriptions/00000000-0000-0000-0000-00000000000/resourceGroups/my_rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/my_identity"
+ identity_dict = {
+ "type": "UserAssigned",
+ "resource_ids": [resource_id],
+ }
+
+ aci_mock.return_value.get_state.return_value = make_mock_container(
+ state="Terminated", exit_code=0, detail_status="test"
+ )
+
+ aci_mock.return_value.exists.return_value = False
+
+ aci = AzureContainerInstancesOperator(
+ ci_conn_id=None,
+ registry_conn_id=None,
+ resource_group="resource-group",
+ name="container-name",
+ image="container-image",
+ region="region",
+ task_id="task",
+ identity=identity_dict,
+ )
+ aci.execute(None)
+ assert aci_mock.return_value.create_or_update.call_count == 1
+ (_, _, called_cg), _ = aci_mock.return_value.create_or_update.call_args
+
+ # verify the operator converted dict -> ContainerGroupIdentity with
proper mapping
+ assert hasattr(called_cg, "identity")
+ assert called_cg.identity is not None
+ # user_assigned_identities should contain the resource id as a key
+ assert resource_id in (called_cg.identity.user_assigned_identities or
{})
+
class XcomMock:
def __init__(self) -> None: