This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76fe18d9883 Adding Task SDK integration tests for asset operations
(#59000)
76fe18d9883 is described below
commit 76fe18d9883d55f7f1ed0e5fcf6dcc2cbc704181
Author: Henry Chen <[email protected]>
AuthorDate: Thu Dec 4 01:16:04 2025 +0800
Adding Task SDK integration tests for asset operations (#59000)
---
.../tests/task_sdk_tests/test_asset_operations.py | 36 +++++++++++++++++-----
1 file changed, 28 insertions(+), 8 deletions(-)
diff --git
a/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_operations.py
b/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_operations.py
index 9e3a65e165c..aa1478da1ef 100644
--- a/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_operations.py
+++ b/task-sdk-integration-tests/tests/task_sdk_tests/test_asset_operations.py
@@ -24,8 +24,6 @@ These tests validate the Execution API endpoints for Asset
operations:
from __future__ import annotations
-import pytest
-
from airflow.sdk.api.datamodels._generated import AssetResponse
from airflow.sdk.execution_time.comms import ErrorResponse
from task_sdk_tests import console
@@ -67,7 +65,6 @@ def test_asset_get_by_name_not_found(sdk_client_for_assets):
console.print("[green]Asset get by name (not found) test passed!")
[email protected](reason="TODO: Implement Asset get_by_uri test")
def test_asset_get_by_uri(sdk_client_for_assets, asset_test_setup):
"""
Test getting asset by URI.
@@ -75,11 +72,23 @@ def test_asset_get_by_uri(sdk_client_for_assets,
asset_test_setup):
Expected: AssetResponse with asset details
Endpoint: GET /execution/assets/by-uri?uri={uri}
"""
- console.print("[yellow]TODO: Implement test_asset_get_by_uri")
- raise NotImplementedError("test_asset_get_by_uri not implemented")
+ console.print("[yellow]Getting asset by URI...")
+
+ response = sdk_client_for_assets.assets.get(uri=asset_test_setup["uri"])
+
+ console.print(" Asset Get By URI Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Name:[/] {response.name}")
+ console.print(f"[bright_blue]URI:[/] {response.uri}")
+ console.print(f"[bright_blue]Group:[/] {response.group}")
+ console.print("=" * 72)
+
+ assert isinstance(response, AssetResponse)
+ assert response.name == asset_test_setup["name"]
+ assert response.uri == asset_test_setup["uri"]
+ console.print("[green]Asset get by URI test passed!")
[email protected](reason="TODO: Implement Asset get_by_uri (not found) test")
def test_asset_get_by_uri_not_found(sdk_client_for_assets):
"""
Test getting non-existent asset by URI.
@@ -87,5 +96,16 @@ def test_asset_get_by_uri_not_found(sdk_client_for_assets):
Expected: ErrorResponse with ASSET_NOT_FOUND error
Endpoint: GET /execution/assets/by-uri?uri={uri}
"""
- console.print("[yellow]TODO: Implement test_asset_get_by_uri_not_found")
- raise NotImplementedError("test_asset_get_by_uri_not_found not
implemented")
+ console.print("[yellow]Getting non-existent asset by URI...")
+
+ response = sdk_client_for_assets.assets.get(uri="non_existent_asset_uri")
+
+ console.print(" Asset Get (Not Found) Response ".center(72, "="))
+ console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
+ console.print(f"[bright_blue]Error Type:[/] {response.error}")
+ console.print(f"[bright_blue]Detail:[/] {response.detail}")
+ console.print("=" * 72)
+
+ assert isinstance(response, ErrorResponse)
+ assert str(response.error).endswith("ASSET_NOT_FOUND")
+ console.print("[green]Asset get by URI (not found) test passed!")