This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 05f00d654ae Cleaning up asset_state references in task sdk (#68429)
05f00d654ae is described below

commit 05f00d654ae1dad35c3ab04bd6e4ec2ee02a4745
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 15 12:11:35 2026 +0530

    Cleaning up asset_state references in task sdk (#68429)
---
 .../task_sdk/execution_time/test_task_runner.py    | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)

diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 2e87b4139d9..8c1312833e8 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5678,7 +5678,7 @@ class TestTaskInstanceStateOperations:
             return AssetResult(name=actual.uri, uri=actual.uri, group="asset")
         return OKResponse(ok=True)
 
-    def test_asset_state_get_and_set(self, create_runtime_ti, 
mock_supervisor_comms):
+    def test_asset_state_store_get_and_set(self, create_runtime_ti, 
mock_supervisor_comms):
         watched = Asset(name="my_asset", uri="s3://bucket/data")
 
         class WatcherOperator(BaseOperator):
@@ -5697,7 +5697,9 @@ class TestTaskInstanceStateOperations:
         )
         
mock_supervisor_comms.send.assert_any_call(GetAssetStateStoreByName(name="my_asset",
 key="watermark"))
 
-    def test_asset_state_get_returns_default_when_key_missing(self, 
create_runtime_ti, mock_supervisor_comms):
+    def test_asset_state_store_get_returns_default_when_key_missing(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
         watched = Asset(name="my_asset", uri="s3://bucket/data")
         captured = {}
 
@@ -5718,7 +5720,7 @@ class TestTaskInstanceStateOperations:
 
         assert captured["result"] == "2026-01-01T00:00:00+00:00"
 
-    def test_asset_state_delete(self, create_runtime_ti, 
mock_supervisor_comms):
+    def test_asset_state_store_delete(self, create_runtime_ti, 
mock_supervisor_comms):
         watched = Asset(name="my_asset", uri="s3://bucket/data")
 
         class WatcherOperator(BaseOperator):
@@ -5735,7 +5737,7 @@ class TestTaskInstanceStateOperations:
             DeleteAssetStateStoreByName(name="my_asset", key="watermark")
         )
 
-    def test_asset_state_clear(self, create_runtime_ti, mock_supervisor_comms):
+    def test_asset_state_store_clear(self, create_runtime_ti, 
mock_supervisor_comms):
         watched = Asset(name="my_asset", uri="s3://bucket/data")
 
         class WatcherOperator(BaseOperator):
@@ -5750,7 +5752,7 @@ class TestTaskInstanceStateOperations:
 
         
mock_supervisor_comms.send.assert_any_call(ClearAssetStateStoreByName(name="my_asset"))
 
-    def test_asset_state_uri_ref_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
+    def test_asset_state_store_uri_ref_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
         watched = AssetUriRef(uri="s3://bucket/data")
 
         class WatcherOperator(BaseOperator):
@@ -5771,7 +5773,7 @@ class TestTaskInstanceStateOperations:
             GetAssetStateStoreByUri(uri="s3://bucket/data", key="watermark")
         )
 
-    def test_asset_state_alias_as_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
+    def test_asset_state_store_alias_as_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
         alias = AssetAlias(name="my_alias")
         resolved = Asset(name="resolved_asset", uri="s3://bucket/resolved")
 
@@ -5796,7 +5798,7 @@ class TestTaskInstanceStateOperations:
             SetAssetStateStoreByName(name="resolved_asset", key="watermark", 
value="2026-05-01")
         )
 
-    def test_asset_state_alias_inlet_no_resolved_assets(self, 
create_runtime_ti, mock_supervisor_comms):
+    def test_asset_state_store_alias_inlet_no_resolved_assets(self, 
create_runtime_ti, mock_supervisor_comms):
         alias = AssetAlias(name="empty_alias")
 
         class WatcherOperator(BaseOperator):
@@ -5815,7 +5817,7 @@ class TestTaskInstanceStateOperations:
 
         run(runtime_ti, context=runtime_ti.get_template_context(), 
log=mock.MagicMock())
 
-    def test_asset_state_keyed_access_single_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
+    def test_asset_state_store_keyed_access_single_inlet(self, 
create_runtime_ti, mock_supervisor_comms):
         watched = Asset(name="my_asset", uri="s3://bucket/data")
 
         class WatcherOperator(BaseOperator):
@@ -5833,7 +5835,7 @@ class TestTaskInstanceStateOperations:
             SetAssetStateStoreByName(name="my_asset", key="watermark", 
value="2026-05-01")
         )
 
-    def test_asset_state_multi_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
+    def test_asset_state_store_multi_inlet(self, create_runtime_ti, 
mock_supervisor_comms):
         asset_a = Asset(name="asset_a", uri="s3://bucket/a")
         asset_b = Asset(name="asset_b", uri="s3://bucket/b")
 
@@ -5855,7 +5857,7 @@ class TestTaskInstanceStateOperations:
             SetAssetStateStoreByName(name="asset_b", key="watermark_b", 
value="2026-05-02")
         )
 
-    def test_asset_state_set_sends_reference_via_custom_backend(
+    def test_asset_state_store_set_sends_reference_via_custom_backend(
         self, create_runtime_ti, mock_supervisor_comms
     ):
         """When a worker backend is configured, asset state set() sends a 
reference, not the actual value."""

Reply via email to