This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 087c656cf2a Completing stubbed tests for AssetEvent operations (#59182)
087c656cf2a is described below
commit 087c656cf2a31c1bdd809d4585ac5155a5694771
Author: Jake Roach <[email protected]>
AuthorDate: Tue Dec 9 00:54:22 2025 -0500
Completing stubbed tests for AssetEvent operations (#59182)
---
task-sdk-integration-tests/dags/test_asset_dag.py | 11 +++--
.../tests/task_sdk_tests/conftest.py | 1 +
.../task_sdk_tests/test_asset_event_operations.py | 56 ++++++++++++++++++----
3 files changed, 56 insertions(+), 12 deletions(-)
diff --git a/task-sdk-integration-tests/dags/test_asset_dag.py
b/task-sdk-integration-tests/dags/test_asset_dag.py
index 4ae28268ffc..13cdb46cf34 100644
--- a/task-sdk-integration-tests/dags/test_asset_dag.py
+++ b/task-sdk-integration-tests/dags/test_asset_dag.py
@@ -25,9 +25,10 @@ This file contains:
from __future__ import annotations
-from airflow.sdk import DAG, Asset, task
+from airflow.sdk import DAG, Asset, AssetAlias, task
test_asset = Asset(uri="test://asset1", name="test_asset")
+test_asset_alias = AssetAlias(name="test_asset_alias")
with DAG(
dag_id="asset_producer_dag",
@@ -36,10 +37,14 @@ with DAG(
catchup=False,
) as producer_dag:
- @task(outlets=[test_asset])
- def produce_asset():
+ @task(outlets=[test_asset, test_asset_alias])
+ def produce_asset(**context):
"""Task that produces the test asset."""
print("Producing test asset")
+
+ # Ensure AssetAlias is associated with Asset
+ context["outlet_events"][test_asset_alias].add(test_asset)
+
return "asset_produced"
produce_asset()
diff --git a/task-sdk-integration-tests/tests/task_sdk_tests/conftest.py
b/task-sdk-integration-tests/tests/task_sdk_tests/conftest.py
index f0ac9c167ed..95e4c86bbb6 100644
--- a/task-sdk-integration-tests/tests/task_sdk_tests/conftest.py
+++ b/task-sdk-integration-tests/tests/task_sdk_tests/conftest.py
@@ -737,5 +737,6 @@ def asset_test_setup(docker_compose_setup, airflow_ready):
additional_metadata={
"name": "test_asset",
"uri": "test://asset1/",
+ "alias_name": "test_asset_alias",
},
)
diff --git
a/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_event_operations.py
b/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_event_operations.py
index c1c43708428..2f4fd0f3d73 100644
---
a/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_event_operations.py
+++
b/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_event_operations.py
@@ -23,8 +23,6 @@ These tests validate the Execution API endpoints for Asset
Event operations:
from __future__ import annotations
-import pytest
-
from airflow.sdk.api.datamodels._generated import AssetEventsResponse
from task_sdk_tests import console
@@ -68,7 +66,6 @@ def test_asset_event_get_not_found(sdk_client_for_assets):
console.print("[green]✅ Asset event get (not found) test passed!")
[email protected](reason="TODO: Implement Asset Event get_by_uri test")
def test_asset_event_get_by_uri(sdk_client_for_assets, asset_test_setup):
"""
Test getting asset events by URI.
@@ -76,17 +73,58 @@ def test_asset_event_get_by_uri(sdk_client_for_assets,
asset_test_setup):
Expected: AssetEventsResponse with events
Endpoint: GET /execution/asset-events/by-asset?uri={uri}
"""
- console.print("[yellow]TODO: Implement test_asset_event_get_by_uri")
- raise NotImplementedError("test_asset_event_get_by_uri not implemented")
+ console.print("[yellow]Getting asset events by URI...")
+
+ response =
sdk_client_for_assets.asset_events.get(uri=asset_test_setup["uri"])
+
+ console.print(" Asset Event Get Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Number of Events:[/]
{len(response.asset_events)}")
+
+ assert isinstance(response, AssetEventsResponse)
+ assert len(response.asset_events) >= 1
+
+ event = response.asset_events[0]
+
+ console.print(f"[bright_blue]First Event ID:[/] {event.id}")
+ console.print(f"[bright_blue]First Event Asset Name:[/]
{event.asset.name}")
+ console.print(f"[bright_blue]First Event Asset URI:[/] {event.asset.uri}")
+ console.print(f"[bright_blue]First Event Timestamp:[/] {event.timestamp}")
+ console.print("=" * 72)
+
+ assert event.asset.name == asset_test_setup["name"]
+ assert event.asset.uri == asset_test_setup["uri"]
+
+ console.print("[green]✅ Asset event get (URI) test passed!")
[email protected](reason="TODO: Implement Asset Event get_by_alias test")
-def test_asset_event_get_by_alias(sdk_client_for_assets):
+def test_asset_event_get_by_alias(sdk_client_for_assets, asset_test_setup):
"""
Test getting asset events by alias name.
Expected: AssetEventsResponse with events
Endpoint: GET /execution/asset-events/by-asset-alias?name={alias_name}
"""
- console.print("[yellow]TODO: Implement test_asset_event_get_by_alias")
- raise NotImplementedError("test_asset_event_get_by_alias not implemented")
+ console.print("[yellow]Getting asset events by alias...")
+
+ response =
sdk_client_for_assets.asset_events.get(alias_name=asset_test_setup["alias_name"])
+
+ console.print(" Asset Event Get Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Number of Events:[/]
{len(response.asset_events)}")
+
+ assert isinstance(response, AssetEventsResponse)
+ assert len(response.asset_events) >= 1
+
+ event = response.asset_events[0]
+
+ console.print(f"[bright_blue]First Event ID:[/] {event.id}")
+ console.print(f"[bright_blue]First Event Asset Name:[/]
{event.asset.name}")
+ console.print(f"[bright_blue]First Event Asset URI:[/] {event.asset.uri}")
+ console.print(f"[bright_blue]First Event Timestamp:[/] {event.timestamp}")
+ console.print("=" * 72)
+
+ assert event.asset.name == asset_test_setup["name"]
+ assert event.asset.uri == asset_test_setup["uri"]
+
+ console.print("[green]✅ Asset event get (alias) test passed!")