This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 805a4750411 Update UI to allow creation of DagRuns with partition key
(#58004)
805a4750411 is described below
commit 805a4750411d7a9e702ba944263d9832110ccfd6
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Nov 10 13:27:23 2025 -0800
Update UI to allow creation of DagRuns with partition key (#58004)
Allow triggering dag runs for a specific partition key. When there's a
partition key associated with a dag run, show it in the UI.
---
.../api_fastapi/core_api/datamodels/assets.py | 2 +
.../api_fastapi/core_api/datamodels/dag_run.py | 3 +
.../core_api/openapi/v2-rest-api-generated.yaml | 21 +++
.../api_fastapi/core_api/routes/public/assets.py | 1 +
.../api_fastapi/core_api/routes/public/dag_run.py | 1 +
.../execution_api/datamodels/asset_event.py | 1 +
.../execution_api/datamodels/taskinstance.py | 1 +
.../execution_api/routes/asset_events.py | 1 +
.../api_fastapi/execution_api/versions/__init__.py | 2 +
.../execution_api/versions/v2025_11_07.py | 47 ++++++
airflow-core/src/airflow/models/dagrun.py | 1 +
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 46 +++++-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 4 +
.../airflow/ui/public/i18n/locales/en/common.json | 1 +
.../src/components/TriggerDag/TriggerDAGForm.tsx | 21 +++
.../ui/src/pages/Asset/CreateAssetEventModal.tsx | 24 ++-
.../src/airflow/ui/src/pages/Run/Header.tsx | 8 +
.../src/airflow/ui/src/queries/useTrigger.ts | 1 +
.../core_api/routes/public/test_assets.py | 7 +
.../core_api/routes/public/test_dag_run.py | 5 +
.../versions/head/test_asset_events.py | 17 +++
.../versions/head/test_task_instances.py | 1 +
.../versions/v2025_09_23/test_asset_events.py | 166 +++++++++++++++++++++
.../tests/unit/callbacks/test_callback_requests.py | 6 +
.../tests/unit/cli/commands/test_asset_command.py | 2 +
.../tests/unit/dag_processing/test_processor.py | 9 ++
.../src/airflowctl/api/datamodels/generated.py | 4 +
.../src/airflow/sdk/api/datamodels/_generated.py | 4 +-
.../task_sdk/execution_time/test_supervisor.py | 1 +
29 files changed, 400 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index 4394c21acf0..d6a1106dbc2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -140,6 +140,7 @@ class AssetEventResponse(BaseModel):
source_map_index: int
created_dagruns: list[DagRunAssetReference]
timestamp: datetime
+ partition_key: str | None = None
@field_validator("extra", mode="after")
@classmethod
@@ -174,6 +175,7 @@ class CreateAssetEventsBody(StrictBaseModel):
"""Create asset events request."""
asset_id: int
+ partition_key: str | None = None
extra: dict = Field(default_factory=dict)
@field_validator("extra", mode="after")
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 08850367663..56546b58996 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -84,6 +84,7 @@ class DAGRunResponse(BaseModel):
dag_versions: list[DagVersionResponse]
bundle_version: str | None
dag_display_name: str = Field(validation_alias=AliasPath("dag_model",
"dag_display_name"))
+ partition_key: str | None
class DAGRunCollectionResponse(BaseModel):
@@ -104,6 +105,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
conf: dict | None = Field(default_factory=dict)
note: str | None = None
+ partition_key: str | None = None
@model_validator(mode="after")
def check_data_intervals(self):
@@ -141,6 +143,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"run_after": run_after,
"conf": self.conf,
"note": self.note,
+ "partition_key": self.partition_key,
}
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index c81c0190bb2..188986079f6 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -8811,6 +8811,11 @@ components:
type: string
format: date-time
title: Timestamp
+ partition_key:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Partition Key
type: object
required:
- id
@@ -9804,6 +9809,11 @@ components:
asset_id:
type: integer
title: Asset Id
+ partition_key:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Partition Key
extra:
additionalProperties: true
type: object
@@ -10408,6 +10418,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
+ partition_key:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Partition Key
type: object
required:
- dag_run_id
@@ -10430,6 +10445,7 @@ components:
- dag_versions
- bundle_version
- dag_display_name
+ - partition_key
title: DAGRunResponse
description: DAG Run serializer for responses.
DAGRunsBatchBody:
@@ -12929,6 +12945,11 @@ components:
- type: string
- type: 'null'
title: Note
+ partition_key:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Partition Key
additionalProperties: false
type: object
required:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 94dd05e85e5..0c23cc93a78 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -361,6 +361,7 @@ def create_asset_event(
asset=asset_model,
timestamp=timestamp,
extra=body.extra,
+ partition_key=body.partition_key,
session=session,
)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index a39d6fd2169..ae644e6cfc2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -466,6 +466,7 @@ def trigger_dag_run(
triggered_by=triggered_by,
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
+ partition_key=params["partition_key"],
session=session,
)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
index 181afd49c1a..64623c17fb1 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
@@ -52,6 +52,7 @@ class AssetEventResponse(BaseModel):
source_dag_id: str | None = None
source_run_id: str | None = None
source_map_index: int | None = None
+ partition_key: str | None = None
class AssetEventsResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 4213af1b120..e576ac13740 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -303,6 +303,7 @@ class DagRun(StrictBaseModel):
conf: dict[str, Any] | None = None
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]
+ partition_key: str | None
class TIRunContext(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py
index 205e7efad32..4525a9140e5 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_events.py
@@ -66,6 +66,7 @@ def _get_asset_events_through_sql_clauses(
source_dag_id=event.source_dag_id,
source_run_id=event.source_run_id,
source_map_index=event.source_map_index,
+ partition_key=event.partition_key,
)
for event in asset_events
]
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index e7301fceb47..083dadba902 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -28,9 +28,11 @@ from airflow.api_fastapi.execution_api.versions.v2025_08_10
import (
from airflow.api_fastapi.execution_api.versions.v2025_09_23 import
AddDagVersionIdField
from airflow.api_fastapi.execution_api.versions.v2025_10_27 import
MakeDagRunConfNullable
from airflow.api_fastapi.execution_api.versions.v2025_11_05 import
AddTriggeringUserNameField
+from airflow.api_fastapi.execution_api.versions.v2025_11_07 import
AddPartitionKeyField
bundle = VersionBundle(
HeadVersion(),
+ Version("2025-11-07", AddPartitionKeyField),
Version("2025-11-05", AddTriggeringUserNameField),
Version("2025-10-27", MakeDagRunConfNullable),
Version("2025-09-23", AddDagVersionIdField),
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
new file mode 100644
index 00000000000..b11811179f1
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from cadwyn import ResponseInfo, VersionChange,
convert_response_to_previous_version_for, schema
+
+from airflow.api_fastapi.execution_api.datamodels.asset_event import
AssetEventResponse, AssetEventsResponse
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun,
TIRunContext
+
+
+class AddPartitionKeyField(VersionChange):
+ """Add the `partition_key` field to DagRun model."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version = (
+ schema(DagRun).field("partition_key").didnt_exist,
+ schema(AssetEventResponse).field("partition_key").didnt_exist,
+ )
+
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def remove_partition_key_from_dag_run(response: ResponseInfo) -> None: #
type: ignore[misc]
+ """Remove the `partition_key` field from the dag_run object when
converting to the previous version."""
+ if "dag_run" in response.body and isinstance(response.body["dag_run"],
dict):
+ response.body["dag_run"].pop("partition_key", None)
+
+ @convert_response_to_previous_version_for(AssetEventsResponse) # type:
ignore[arg-type]
+ def remove_partition_key_from_asset_events(response: ResponseInfo) ->
None: # type: ignore[misc]
+ """Remove the `partition_key` field from the dag_run object when
converting to the previous version."""
+ events = response.body["asset_events"]
+ for elem in events:
+ elem.pop("partition_key", None)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 785e3fa8867..97598a3c2de 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1449,6 +1449,7 @@ class DagRun(Base, LoggingMixin):
state=self.state,
conf=self.conf,
consumed_asset_events=[],
+ partition_key=self.partition_key,
)
runtime_ti = RuntimeTaskInstance.model_construct(
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 0b37b80d067..b30b41d84aa 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -268,6 +268,17 @@ export const $AssetEventResponse = {
type: 'string',
format: 'date-time',
title: 'Timestamp'
+ },
+ partition_key: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Key'
}
},
type: 'object',
@@ -1648,6 +1659,17 @@ export const $CreateAssetEventsBody = {
type: 'integer',
title: 'Asset Id'
},
+ partition_key: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Key'
+ },
extra: {
additionalProperties: true,
type: 'object',
@@ -2588,10 +2610,21 @@ export const $DAGRunResponse = {
dag_display_name: {
type: 'string',
title: 'Dag Display Name'
+ },
+ partition_key: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Key'
}
},
type: 'object',
- required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at',
'start_date', 'end_date', 'duration', 'data_interval_start',
'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type',
'state', 'triggered_by', 'triggering_user_name', 'conf', 'note',
'dag_versions', 'bundle_version', 'dag_display_name'],
+ required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at',
'start_date', 'end_date', 'duration', 'data_interval_start',
'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type',
'state', 'triggered_by', 'triggering_user_name', 'conf', 'note',
'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key'],
title: 'DAGRunResponse',
description: 'DAG Run serializer for responses.'
} as const;
@@ -6356,6 +6389,17 @@ export const $TriggerDAGRunPostBody = {
}
],
title: 'Note'
+ },
+ partition_key: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Key'
}
},
additionalProperties: false,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 5f5fbda3eff..ee850b03296 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -72,6 +72,7 @@ export type AssetEventResponse = {
source_map_index: number;
created_dagruns: Array<DagRunAssetReference>;
timestamp: string;
+ partition_key?: string | null;
};
/**
@@ -501,6 +502,7 @@ export type ConnectionTestResponse = {
*/
export type CreateAssetEventsBody = {
asset_id: number;
+ partition_key?: string | null;
extra?: {
[key: string]: unknown;
};
@@ -684,6 +686,7 @@ export type DAGRunResponse = {
dag_versions: Array<DagVersionResponse>;
bundle_version: string | null;
dag_display_name: string;
+ partition_key: string | null;
};
/**
@@ -1544,6 +1547,7 @@ export type TriggerDAGRunPostBody = {
[key: string]: unknown;
} | null;
note?: string | null;
+ partition_key?: string | null;
};
/**
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 1958abbb085..c69dfcf1384 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -59,6 +59,7 @@
"dataIntervalEnd": "Data Interval End",
"dataIntervalStart": "Data Interval Start",
"lastSchedulingDecision": "Last Scheduling Decision",
+ "partitionKey": "Partition key",
"queuedAt": "Queued At",
"runAfter": "Run After",
"runType": "Run Type",
diff --git
a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
index 8c92730714d..82fac42d221 100644
--- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
+++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
@@ -48,10 +48,12 @@ export type DagRunTriggerParams = {
dagRunId: string;
logicalDate: string;
note: string;
+ partitionKey: string | undefined;
};
const TriggerDAGForm = ({ dagDisplayName, dagId, isPaused, onClose, open }:
TriggerDAGFormProps) => {
const { t: translate } = useTranslation(["common", "components"]);
+ const { t: rootTranslate } = useTranslation();
const [errors, setErrors] = useState<{ conf?: string; date?: unknown }>({});
const [formError, setFormError] = useState(false);
const initialParamsDict = useDagParams(dagId, open);
@@ -68,6 +70,7 @@ const TriggerDAGForm = ({ dagDisplayName, dagId, isPaused,
onClose, open }: Trig
// Default logical date to now, show it in the selected timezone
logicalDate: dayjs().format(DEFAULT_DATETIME_FORMAT),
note: "",
+ partitionKey: undefined,
},
});
@@ -140,6 +143,24 @@ const TriggerDAGForm = ({ dagDisplayName, dagId, isPaused,
onClose, open }: Trig
</Field.Root>
)}
/>
+ <Controller
+ control={control}
+ name="partitionKey"
+ render={({ field }) => (
+ <Field.Root mt={6} orientation="horizontal">
+ <Stack>
+ <Field.Label fontSize="md" style={{ flexBasis: "30%" }}>
+ {rootTranslate("dagRun.partitionKey")}
+ </Field.Label>
+ </Stack>
+ <Stack css={{ flexBasis: "70%" }}>
+ <Input {...field} size="sm" />
+ {/* todo: AIP-76 */}
+ {/*
<Field.HelperText>{translate("components:triggerDag.runIdHelp")}</Field.HelperText>
*/}
+ </Stack>
+ </Field.Root>
+ )}
+ />
<Controller
control={control}
name="note"
diff --git
a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
index 500c82ded1c..a97dce1f9c0 100644
--- a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
@@ -57,6 +57,7 @@ export const CreateAssetEventModal = ({ asset, onClose, open
}: Props) => {
const [extraError, setExtraError] = useState<string | undefined>();
const [unpause, setUnpause] = useState(true);
const [extra, setExtra] = useState("{}");
+ const [partitionKey, setPartitionKey] = useState<string |
undefined>(undefined);
const queryClient = useQueryClient();
const { data } = useDependenciesServiceGetDependencies({ nodeId:
`asset:${asset.id}` }, undefined, {
@@ -154,7 +155,11 @@ export const CreateAssetEventModal = ({ asset, onClose,
open }: Props) => {
materializeAsset({ assetId: asset.id });
} else {
createAssetEvent({
- requestBody: { asset_id: asset.id, extra: JSON.parse(extra) as
Record<string, unknown> },
+ requestBody: {
+ asset_id: asset.id,
+ extra: JSON.parse(extra) as Record<string, unknown>,
+ partition_key: partitionKey ?? null,
+ },
});
}
};
@@ -199,11 +204,18 @@ export const CreateAssetEventModal = ({ asset, onClose,
open }: Props) => {
</HStack>
</RadioCardRoot>
{eventType === "manual" ? (
- <Field.Root mt={6}>
- <Field.Label
fontSize="md">{translate("createEvent.manual.extra")}</Field.Label>
- <JsonEditor onChange={validateAndPrettifyJson} value={extra} />
- <Text color="fg.error">{extraError}</Text>
- </Field.Root>
+ <>
+ <Field.Root mt={6}>
+ <Field.Label
fontSize="md">{translate("createEvent.manual.extra")}</Field.Label>
+ <JsonEditor onChange={validateAndPrettifyJson} value={extra} />
+ <Text color="fg.error">{extraError}</Text>
+ </Field.Root>
+ <Field.Root mt={6}>
+ <Field.Label
fontSize="md">{translate("dagRun.partitionKey")}</Field.Label>
+ <JsonEditor onChange={setPartitionKey} value={partitionKey} />
+ <Text color="fg.error">{extraError}</Text>
+ </Field.Root>
+ </>
) : undefined}
{eventType === "materialize" && dag?.is_paused ? (
<Checkbox checked={unpause} colorPalette="brand" onChange={() =>
setUnpause(!unpause)}>
diff --git a/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
b/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
index b07b2c08764..6171fd2f3cd 100644
--- a/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
@@ -124,6 +124,14 @@ export const Header = ({
value: <Time datetime={dagRun.logical_date} />,
},
]),
+ ...(dagRun.partition_key === null
+ ? []
+ : [
+ {
+ label: translate("dagRun.partitionKey"),
+ value: dagRun.partition_key,
+ },
+ ]),
{
label: translate("dagRun.runType"),
value: (
diff --git a/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
b/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
index 933f59ad32e..4d5213b2d5f 100644
--- a/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
@@ -89,6 +89,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: {
dagId: string; onSucce
dag_run_id: checkDagRunId,
logical_date: formattedLogicalDate,
note: checkNote,
+ partition_key: dagRunRequestBody.partitionKey ?? null,
},
});
};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 0f14d58e4dd..618b5b00960 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -817,6 +817,7 @@ class TestGetAssetEvents(TestAssets):
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
},
{
"id": 2,
@@ -844,6 +845,7 @@ class TestGetAssetEvents(TestAssets):
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
},
],
"total_entries": 2,
@@ -998,6 +1000,7 @@ class TestGetAssetEvents(TestAssets):
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
},
{
"id": 2,
@@ -1025,6 +1028,7 @@ class TestGetAssetEvents(TestAssets):
}
],
"timestamp":
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
},
],
"total_entries": 2,
@@ -1270,6 +1274,7 @@ class TestPostAssetEvents(TestAssets):
"source_map_index": -1,
"created_dagruns": [],
"timestamp": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
}
check_last_log(session, dag_id=None, event="create_asset_event",
logical_date=None)
@@ -1308,6 +1313,7 @@ class TestPostAssetEvents(TestAssets):
"source_map_index": -1,
"created_dagruns": [],
"timestamp": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
+ "partition_key": None,
}
def test_should_update_asset_endpoint(self, test_client, session):
@@ -1378,6 +1384,7 @@ class TestPostAssetMaterialize(TestAssets):
"dag_id": self.DAG_ASSET1_ID,
"dag_versions": mock.ANY,
"logical_date": None,
+ "partition_key": None,
"queued_at": mock.ANY,
"run_after": mock.ANY,
"start_date": None,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index cd7a0be4cf5..642fa1524c4 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -259,6 +259,7 @@ def get_dag_run_dict(run: DagRun):
"conf": run.conf,
"note": run.note,
"dag_versions": get_dag_versions_dict(run.dag_versions),
+ "partition_key": None,
}
@@ -1392,6 +1393,7 @@ class TestGetDagRunAssetTriggerEvents:
"state": "running",
}
],
+ "partition_key": None,
}
],
"total_entries": 1,
@@ -1595,6 +1597,7 @@ class TestTriggerDagRun:
"note": note,
"triggered_by": "rest_api",
"triggering_user_name": "test",
+ "partition_key": None,
}
assert response.json() == expected_response_json
@@ -1788,6 +1791,7 @@ class TestTriggerDagRun:
"triggering_user_name": "test",
"conf": {},
"note": note,
+ "partition_key": None,
}
assert response_2.status_code == 409
@@ -1876,6 +1880,7 @@ class TestTriggerDagRun:
"triggering_user_name": "test",
"conf": {},
"note": None,
+ "partition_key": None,
}
@time_machine.travel("2025-10-02 12:00:00", tick=False)
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_events.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_events.py
index a7c74879c4b..e3839f19eaf 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_events.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_events.py
@@ -41,6 +41,7 @@ def test_asset_events(session):
"source_task_id": "bar",
"source_run_id": "custom",
"source_map_index": -1,
+ "partition_key": None,
}
events = [AssetEvent(id=i, timestamp=make_timestamp(i), **common) for i in
(1, 2, 3)]
@@ -122,6 +123,7 @@ class TestGetAssetEventByAsset:
"uri": "s3://bucket/key",
},
"timestamp": "2021-01-01T00:00:00Z",
+ "partition_key": None,
},
{
"id": 2,
@@ -138,6 +140,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
+ "partition_key": None,
},
{
"id": 3,
@@ -154,6 +157,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -190,6 +194,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
+ "partition_key": None,
},
{
"id": 3,
@@ -206,6 +211,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -242,6 +248,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
+ "partition_key": None,
},
{
"id": 2,
@@ -258,6 +265,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -299,6 +307,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -335,6 +344,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
+ "partition_key": None,
},
{
"id": 2,
@@ -351,6 +361,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
+ "partition_key": None,
},
{
"id": 1,
@@ -367,6 +378,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -403,6 +415,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -439,6 +452,7 @@ class TestGetAssetEventByAsset:
},
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
+ "partition_key": None,
},
]
}
@@ -469,6 +483,7 @@ class TestGetAssetEventByAssetAlias:
},
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
+ "partition_key": None,
},
{
"id": 2,
@@ -485,6 +500,7 @@ class TestGetAssetEventByAssetAlias:
},
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
+ "partition_key": None,
},
{
"id": 3,
@@ -501,6 +517,7 @@ class TestGetAssetEventByAssetAlias:
},
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
+ "partition_key": None,
},
]
}
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index a51dd9efc4a..c410d4e4733 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -199,6 +199,7 @@ class TestTIRunState:
"conf": {},
"triggering_user_name": None,
"consumed_asset_events": [],
+ "partition_key": None,
},
"task_reschedule_count": 0,
"upstream_map_indexes": {},
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/test_asset_events.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/test_asset_events.py
new file mode 100644
index 00000000000..da5f2568334
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/test_asset_events.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+
+import pytest
+
+from airflow._shared.timezones import timezone
+from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent,
AssetModel
+
+DEFAULT_DATE = timezone.parse("2021-01-01T00:00:00")
+
+pytestmark = pytest.mark.db_test
+
+
[email protected]
+def ver_client(client):
+ client.headers["Airflow-API-Version"] = "2025-09-23"
+ return client
+
+
[email protected]
+def test_asset_events(session):
+ def make_timestamp(day):
+ return datetime(2021, 1, day, tzinfo=timezone.utc)
+
+ common = {
+ "asset_id": 1,
+ "extra": {"foo": "bar"},
+ "source_dag_id": "foo",
+ "source_task_id": "bar",
+ "source_run_id": "custom",
+ "source_map_index": -1,
+ "partition_key": None,
+ }
+
+ events = [AssetEvent(id=i, timestamp=make_timestamp(i), **common) for i in
(1, 2, 3)]
+ session.add_all(events)
+ session.commit()
+ yield events
+
+ for event in events:
+ session.delete(event)
+ session.commit()
+
+
[email protected]
+def test_asset(session):
+ asset = AssetModel(
+ id=1,
+ name="test_get_asset_by_name",
+ uri="s3://bucket/key",
+ group="asset",
+ extra={"foo": "bar"},
+ created_at=DEFAULT_DATE,
+ updated_at=DEFAULT_DATE,
+ )
+ asset_active = AssetActive.for_asset(asset)
+ session.add_all([asset, asset_active])
+ session.commit()
+
+ yield asset
+
+ session.delete(asset)
+ session.delete(asset_active)
+ session.commit()
+
+
[email protected]
+def test_asset_alias(session, test_asset_events, test_asset):
+ alias = AssetAliasModel(id=1, name="test_alias")
+ alias.asset_events = test_asset_events
+ alias.assets.append(test_asset)
+ session.add(alias)
+ session.commit()
+
+ yield alias
+
+ session.delete(alias)
+ session.commit()
+
+
+class TestGetAssetEventByAsset:
+ @pytest.mark.parametrize(
+ ("uri", "name"),
+ [
+ (None, "test_get_asset_by_name"),
+ ("s3://bucket/key", None),
+ ("s3://bucket/key", "test_get_asset_by_name"),
+ ],
+ )
+ @pytest.mark.usefixtures("test_asset", "test_asset_events")
+ def test_should_not_have_partition_key(self, uri, name, ver_client):
+ response = ver_client.get(
+ "/execution/asset-events/by-asset",
+ params={"name": name, "uri": uri},
+ )
+ assert response.status_code == 200
+ assert response.json() == {
+ "asset_events": [
+ {
+ "id": 1,
+ "extra": {"foo": "bar"},
+ "source_task_id": "bar",
+ "source_dag_id": "foo",
+ "source_run_id": "custom",
+ "source_map_index": -1,
+ "created_dagruns": [],
+ "asset": {
+ "extra": {"foo": "bar"},
+ "group": "asset",
+ "name": "test_get_asset_by_name",
+ "uri": "s3://bucket/key",
+ },
+ "timestamp": "2021-01-01T00:00:00Z",
+ },
+ {
+ "id": 2,
+ "extra": {"foo": "bar"},
+ "source_task_id": "bar",
+ "source_dag_id": "foo",
+ "source_run_id": "custom",
+ "source_map_index": -1,
+ "asset": {
+ "extra": {"foo": "bar"},
+ "group": "asset",
+ "name": "test_get_asset_by_name",
+ "uri": "s3://bucket/key",
+ },
+ "created_dagruns": [],
+ "timestamp": "2021-01-02T00:00:00Z",
+ },
+ {
+ "id": 3,
+ "extra": {"foo": "bar"},
+ "source_task_id": "bar",
+ "source_dag_id": "foo",
+ "source_run_id": "custom",
+ "source_map_index": -1,
+ "asset": {
+ "extra": {"foo": "bar"},
+ "group": "asset",
+ "name": "test_get_asset_by_name",
+ "uri": "s3://bucket/key",
+ },
+ "created_dagruns": [],
+ "timestamp": "2021-01-03T00:00:00Z",
+ },
+ ]
+ }
diff --git a/airflow-core/tests/unit/callbacks/test_callback_requests.py
b/airflow-core/tests/unit/callbacks/test_callback_requests.py
index d6bf66b8d77..210b8a6edcc 100644
--- a/airflow-core/tests/unit/callbacks/test_callback_requests.py
+++ b/airflow-core/tests/unit/callbacks/test_callback_requests.py
@@ -141,6 +141,7 @@ class TestDagRunContext:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
)
ti_data = TIDataModel(
@@ -179,6 +180,7 @@ class TestDagRunContext:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
)
ti_data = TIDataModel(
@@ -219,6 +221,7 @@ class TestDagCallbackRequestWithContext:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
)
ti_data = TIDataModel(
@@ -277,6 +280,7 @@ class TestDagCallbackRequestWithContext:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
)
ti_data = TIDataModel(
@@ -350,6 +354,7 @@ class TestEmailRequest:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
),
@@ -397,6 +402,7 @@ class TestEmailRequest:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
)
diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py
b/airflow-core/tests/unit/cli/commands/test_asset_command.py
index 22a374e1ab6..e0cf0b1364e 100644
--- a/airflow-core/tests/unit/cli/commands/test_asset_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py
@@ -154,6 +154,7 @@ def test_cli_assets_materialize(mock_hasattr, parser:
ArgumentParser, stdout_cap
"duration": None,
"last_scheduling_decision": None,
"note": None,
+ "partition_key": None,
"run_type": "manual",
"start_date": None,
"state": "queued",
@@ -192,6 +193,7 @@ def
test_cli_assets_materialize_with_view_url_template(parser: ArgumentParser, s
"duration": None,
"last_scheduling_decision": None,
"note": None,
+ "partition_key": None,
"run_type": "manual",
"start_date": None,
"state": "queued",
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index e386ba85ec4..099b14e2511 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -702,6 +702,7 @@ class TestExecuteDagCallbacks:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
)
ti_data = TIDataModel(
@@ -821,6 +822,7 @@ class TestExecuteDagCallbacks:
run_type="manual",
state="success",
consumed_asset_events=[],
+ partition_key=None,
)
ti_data = TIDataModel(
@@ -1042,6 +1044,7 @@ class TestExecuteDagCallbacks:
run_type="manual",
state="success",
consumed_asset_events=[],
+ partition_key=None,
),
last_ti=TIDataModel(
id=uuid.uuid4(),
@@ -1133,6 +1136,7 @@ class TestExecuteDagCallbacks:
run_type="manual",
state="success",
consumed_asset_events=[],
+ partition_key=None,
),
last_ti=TIDataModel(
id=uuid.uuid4(),
@@ -1504,6 +1508,7 @@ class TestExecuteEmailCallbacks:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
),
@@ -1565,6 +1570,7 @@ class TestExecuteEmailCallbacks:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
),
@@ -1623,6 +1629,7 @@ class TestExecuteEmailCallbacks:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
),
@@ -1681,6 +1688,7 @@ class TestExecuteEmailCallbacks:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
),
@@ -1757,6 +1765,7 @@ class TestExecuteEmailCallbacks:
run_type="manual",
state="running",
consumed_asset_events=[],
+ partition_key=None,
),
max_tries=2,
),
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 2bf729114b8..f400a674f13 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -255,6 +255,7 @@ class CreateAssetEventsBody(BaseModel):
extra="forbid",
)
asset_id: Annotated[int, Field(title="Asset Id")]
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
@@ -873,6 +874,7 @@ class TriggerDAGRunPostBody(BaseModel):
run_after: Annotated[datetime | None, Field(title="Run After")] = None
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
note: Annotated[str | None, Field(title="Note")] = None
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class TriggerResponse(BaseModel):
@@ -1045,6 +1047,7 @@ class AssetEventResponse(BaseModel):
source_map_index: Annotated[int, Field(title="Source Map Index")]
created_dagruns: Annotated[list[DagRunAssetReference],
Field(title="Created Dagruns")]
timestamp: Annotated[datetime, Field(title="Timestamp")]
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class AssetResponse(BaseModel):
@@ -1396,6 +1399,7 @@ class DAGRunResponse(BaseModel):
dag_versions: Annotated[list[DagVersionResponse], Field(title="Dag
Versions")]
bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None
dag_display_name: Annotated[str, Field(title="Dag Display Name")]
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class DAGRunsBatchBody(BaseModel):
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index ca2dceec3b0..3b263416127 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -27,7 +27,7 @@ from uuid import UUID
from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue,
RootModel
-API_VERSION: Final[str] = "2025-11-05"
+API_VERSION: Final[str] = "2025-11-07"
class AssetAliasReferenceAssetEventDagRun(BaseModel):
@@ -561,6 +561,7 @@ class AssetEventResponse(BaseModel):
source_dag_id: Annotated[str | None, Field(title="Source Dag Id")] = None
source_run_id: Annotated[str | None, Field(title="Source Run Id")] = None
source_map_index: Annotated[int | None, Field(title="Source Map Index")] =
None
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class AssetEventsResponse(BaseModel):
@@ -593,6 +594,7 @@ class DagRun(BaseModel):
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
triggering_user_name: Annotated[str | None, Field(title="Triggering User
Name")] = None
consumed_asset_events: Annotated[list[AssetEventDagRunReference],
Field(title="Consumed Asset Events")]
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
class TIRunContext(BaseModel):
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 0b2b2ee6bc3..67ed4a12e3f 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2069,6 +2069,7 @@ REQUEST_TEST_CASES = [
"dag_id": "test_dag",
"run_id": "prev_run",
"logical_date": timezone.parse("2024-01-14T12:00:00Z"),
+ "partition_key": None,
"run_type": "scheduled",
"start_date": timezone.parse("2024-01-15T12:00:00Z"),
"run_after": timezone.parse("2024-01-15T12:00:00Z"),