jroachgolf84 commented on code in PR #67839:
URL: https://github.com/apache/airflow/pull/67839#discussion_r3343916715


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########


Review Comment:
   I don't think so? What would that implementation look like?



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1266,6 +1269,16 @@ async def create_triggers(self):
             trigger_instance.triggerer_job_id = self.job_id
             trigger_instance.timeout_after = workload.timeout_after
 
+            if isinstance(trigger_instance, BaseEventTrigger) and 
workload.watched_assets:
+                # Reconstruct AssetStateAccessors from watched_assets
+                from airflow.sdk.definitions.asset import Asset
+                from airflow.sdk.execution_time.context import 
AssetStateAccessors

Review Comment:
   Hmmm, not seeing that.



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1286,6 +1293,16 @@ async def create_triggers(self):
             trigger_instance.triggerer_job_id = self.job_id
             trigger_instance.timeout_after = workload.timeout_after
 
+            if isinstance(trigger_instance, BaseEventTrigger) and 
workload.watched_assets:
+                # Reconstruct AssetStateAccessors from watched_assets
+                from airflow.sdk.definitions.asset import Asset
+                from airflow.sdk.execution_time.context import 
AssetStoreAccessors

Review Comment:
   Moved to top-level.



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -741,11 +737,18 @@ def _create_workload(
         render_log_fname: Callable[..., str],
         session: Session,
     ) -> workloads.RunTrigger | None:
+        # Pass the "watched" Assets through for downstream use in 
BaseEventTrigger
         if trigger.task_instance is None:
+            watched_assets: dict[str, str] | None = None
+
+            if trigger.asset_watchers:
+                watched_assets = {a.name: a.uri for a in trigger.assets}

Review Comment:
   Resolved in next commit.



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):

Review Comment:
   Implemented for tests in this PR.



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1286,6 +1293,16 @@ async def create_triggers(self):
             trigger_instance.triggerer_job_id = self.job_id
             trigger_instance.timeout_after = workload.timeout_after
 
+            if isinstance(trigger_instance, BaseEventTrigger) and 
workload.watched_assets:
+                # Reconstruct AssetStateAccessors from watched_assets
+                from airflow.sdk.definitions.asset import Asset
+                from airflow.sdk.execution_time.context import 
AssetStoreAccessors
+
+                # Potentially address Asset vs. AssetRef, AssetUriRef, etc.

Review Comment:
   No, removing comment.



##########
airflow-core/tests/unit/triggers/test_base_trigger.py:
##########
@@ -232,3 +234,25 @@ async def stream():
 
     payloads = [event.payload async for event in 
us.filter_shared_stream(stream())]
     assert [p["region"] for p in payloads] == ["us", "us"]
+
+
+def test_base_event_trigger_asset_state_store_initialized_to_none():
+    """asset_state_store is None before it is set."""
+    trigger = _PlainEventTrigger()
+    assert trigger.asset_state_store is None
+
+
+def test_base_event_trigger_asset_state_store_can_be_set():
+    """asset_state_store can be set once the Trigger is initialized."""
+    trigger = _PlainEventTrigger()
+    mock_store = MagicMock()
+    trigger.asset_state_store = mock_store
+    assert trigger.asset_state_store is mock_store
+
+
+def test_base_event_trigger_asset_state_store_independent_across_instances():
+    """a.asset_state_store does not impact b.asset_state_store."""
+    a = _PlainEventTrigger(name="a")
+    b = _PlainEventTrigger(name="b")
+    a.asset_state_store = MagicMock()
+    assert b.asset_state_store is None

Review Comment:
   Resolved in next commit.



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):
+    """_create_workload() should populate watched_assets when 
trigger.task_instance is None and assets exist."""
+    asset1 = mocker.Mock()
+    asset1.name = "my_asset"
+    asset1.uri = "s3://bucket/key"
+
+    asset2 = mocker.Mock()
+    asset2.name = "other_asset"
+    asset2.uri = "gs://bucket/path"
+
+    trigger = mocker.Mock()
+    trigger.id = 42
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None  # Not tied to a Task (similar to a 
BaseEventTrigger)
+    trigger.assets = [asset1, asset2]
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets == {"my_asset": "s3://bucket/key", 
"other_asset": "gs://bucket/path"}
+
+
+def 
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor, 
mocker):
+    """_create_workload() should set watched_assets=None when 
trigger.task_instance is None and assets is empty."""
+    trigger = mocker.Mock()
+    trigger.id = 43
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None
+    trigger.assets = []  # No Assets are attached to the trigger
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+    """RunTrigger workload should accept and store watched_assets."""
+    from airflow.executors.workloads.trigger import RunTrigger
+
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+        watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+    )
+    assert workload.watched_assets == {"asset_a": "s3://a", "asset_b": 
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+    """RunTrigger workload watched_assets should default to None."""
+    from airflow.executors.workloads.trigger import RunTrigger
+
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+    )
+    assert workload.watched_assets is None
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_injects_asset_store_for_base_event_trigger(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store is populated on BaseEventTrigger instances when 
watched_assets is set."""
+    from airflow.sdk.execution_time.context import AssetStoreAccessors
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+    injected_instances = []
+
+    class _WatcherTrigger(BaseEventTrigger):
+        def __init__(self, **kwargs):
+            super().__init__(**kwargs)
+            injected_instances.append(self)
+
+        def serialize(self):
+            return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+        async def run(self):
+            yield TriggerEvent("done")
+
+    mock_get_classpath.return_value = _WatcherTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=10,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="fake",
+            watched_assets={"my_asset": "s3://bucket/key"},
+        )
+    )
+
+    await runner.create_triggers()
+
+    # This is only testing that an exception was NOT thrown when creating the 
Trigger
+    assert 10 in runner.triggers
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_store is not None
+    assert isinstance(injected_instances[0].asset_store, AssetStoreAccessors)
+
+    runner.triggers[10]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_store_none_when_no_watched_assets(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store stays None when watched_assets is not set on the 
workload."""
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+    injected_instances = []
+
+    class _WatcherTrigger(BaseEventTrigger):
+        def __init__(self, **kwargs):
+            super().__init__(**kwargs)
+            injected_instances.append(self)
+
+        def serialize(self):
+            return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+        async def run(self):
+            yield TriggerEvent("done")
+
+    mock_get_classpath.return_value = _WatcherTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=11,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="fake",
+            watched_assets=None,
+        )
+    )
+
+    await runner.create_triggers()
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_store is None
+
+    runner.triggers[11]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_skips_asset_store_for_non_event_trigger(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store injection is skipped for plain BaseTrigger 
(non-BaseEventTrigger) instances."""
+    from airflow.triggers.testing import SuccessTrigger

Review Comment:
   Moved.



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1286,6 +1293,16 @@ async def create_triggers(self):
             trigger_instance.triggerer_job_id = self.job_id
             trigger_instance.timeout_after = workload.timeout_after
 
+            if isinstance(trigger_instance, BaseEventTrigger) and 
workload.watched_assets:
+                # Reconstruct AssetStateAccessors from watched_assets

Review Comment:
   Removed.



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):

Review Comment:
   Added test!



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):
+    """_create_workload() should populate watched_assets when 
trigger.task_instance is None and assets exist."""
+    asset1 = mocker.Mock()
+    asset1.name = "my_asset"
+    asset1.uri = "s3://bucket/key"
+
+    asset2 = mocker.Mock()
+    asset2.name = "other_asset"
+    asset2.uri = "gs://bucket/path"
+
+    trigger = mocker.Mock()
+    trigger.id = 42
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None  # Not tied to a Task (similar to a 
BaseEventTrigger)
+    trigger.assets = [asset1, asset2]
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets == {"my_asset": "s3://bucket/key", 
"other_asset": "gs://bucket/path"}
+
+
+def 
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor, 
mocker):
+    """_create_workload() should set watched_assets=None when 
trigger.task_instance is None and assets is empty."""
+    trigger = mocker.Mock()
+    trigger.id = 43
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None
+    trigger.assets = []  # No Assets are attached to the trigger
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+    """RunTrigger workload should accept and store watched_assets."""
+    from airflow.executors.workloads.trigger import RunTrigger

Review Comment:
   Moved.



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):
+    """_create_workload() should populate watched_assets when 
trigger.task_instance is None and assets exist."""
+    asset1 = mocker.Mock()
+    asset1.name = "my_asset"
+    asset1.uri = "s3://bucket/key"
+
+    asset2 = mocker.Mock()
+    asset2.name = "other_asset"
+    asset2.uri = "gs://bucket/path"
+
+    trigger = mocker.Mock()
+    trigger.id = 42
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None  # Not tied to a Task (similar to a 
BaseEventTrigger)
+    trigger.assets = [asset1, asset2]
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets == {"my_asset": "s3://bucket/key", 
"other_asset": "gs://bucket/path"}
+
+
+def 
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor, 
mocker):
+    """_create_workload() should set watched_assets=None when 
trigger.task_instance is None and assets is empty."""
+    trigger = mocker.Mock()
+    trigger.id = 43
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None
+    trigger.assets = []  # No Assets are attached to the trigger
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+    """RunTrigger workload should accept and store watched_assets."""
+    from airflow.executors.workloads.trigger import RunTrigger
+
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+        watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+    )
+    assert workload.watched_assets == {"asset_a": "s3://a", "asset_b": 
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+    """RunTrigger workload watched_assets should default to None."""
+    from airflow.executors.workloads.trigger import RunTrigger
+
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+    )
+    assert workload.watched_assets is None
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_injects_asset_store_for_base_event_trigger(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store is populated on BaseEventTrigger instances when 
watched_assets is set."""
+    from airflow.sdk.execution_time.context import AssetStoreAccessors
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+    injected_instances = []
+
+    class _WatcherTrigger(BaseEventTrigger):
+        def __init__(self, **kwargs):
+            super().__init__(**kwargs)
+            injected_instances.append(self)
+
+        def serialize(self):
+            return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+        async def run(self):
+            yield TriggerEvent("done")
+
+    mock_get_classpath.return_value = _WatcherTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=10,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="fake",
+            watched_assets={"my_asset": "s3://bucket/key"},
+        )
+    )
+
+    await runner.create_triggers()
+
+    # This is only testing that an exception was NOT thrown when creating the 
Trigger
+    assert 10 in runner.triggers
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_store is not None
+    assert isinstance(injected_instances[0].asset_store, AssetStoreAccessors)
+
+    runner.triggers[10]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_store_none_when_no_watched_assets(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store stays None when watched_assets is not set on the 
workload."""
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent

Review Comment:
   Moved.



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):
+    """_create_workload() should populate watched_assets when 
trigger.task_instance is None and assets exist."""
+    asset1 = mocker.Mock()
+    asset1.name = "my_asset"
+    asset1.uri = "s3://bucket/key"
+
+    asset2 = mocker.Mock()
+    asset2.name = "other_asset"
+    asset2.uri = "gs://bucket/path"
+
+    trigger = mocker.Mock()
+    trigger.id = 42
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None  # Not tied to a Task (similar to a 
BaseEventTrigger)
+    trigger.assets = [asset1, asset2]
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets == {"my_asset": "s3://bucket/key", 
"other_asset": "gs://bucket/path"}
+
+
+def 
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor, 
mocker):
+    """_create_workload() should set watched_assets=None when 
trigger.task_instance is None and assets is empty."""
+    trigger = mocker.Mock()
+    trigger.id = 43
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None
+    trigger.assets = []  # No Assets are attached to the trigger
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+    """RunTrigger workload should accept and store watched_assets."""
+    from airflow.executors.workloads.trigger import RunTrigger
+
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+        watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+    )
+    assert workload.watched_assets == {"asset_a": "s3://a", "asset_b": 
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+    """RunTrigger workload watched_assets should default to None."""
+    from airflow.executors.workloads.trigger import RunTrigger
+
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+    )
+    assert workload.watched_assets is None
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_injects_asset_store_for_base_event_trigger(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store is populated on BaseEventTrigger instances when 
watched_assets is set."""
+    from airflow.sdk.execution_time.context import AssetStoreAccessors
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+    injected_instances = []
+
+    class _WatcherTrigger(BaseEventTrigger):
+        def __init__(self, **kwargs):
+            super().__init__(**kwargs)
+            injected_instances.append(self)
+
+        def serialize(self):
+            return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+        async def run(self):
+            yield TriggerEvent("done")
+
+    mock_get_classpath.return_value = _WatcherTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=10,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="fake",
+            watched_assets={"my_asset": "s3://bucket/key"},
+        )
+    )
+
+    await runner.create_triggers()
+
+    # This is only testing that an exception was NOT thrown when creating the 
Trigger
+    assert 10 in runner.triggers
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_store is not None
+    assert isinstance(injected_instances[0].asset_store, AssetStoreAccessors)
+
+    runner.triggers[10]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_store_none_when_no_watched_assets(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store stays None when watched_assets is not set on the 
workload."""
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+    injected_instances = []
+
+    class _WatcherTrigger(BaseEventTrigger):
+        def __init__(self, **kwargs):
+            super().__init__(**kwargs)
+            injected_instances.append(self)
+
+        def serialize(self):
+            return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+        async def run(self):
+            yield TriggerEvent("done")
+
+    mock_get_classpath.return_value = _WatcherTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=11,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="fake",
+            watched_assets=None,
+        )
+    )
+
+    await runner.create_triggers()
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_store is None
+
+    runner.triggers[11]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_skips_asset_store_for_non_event_trigger(
+    mock_get_classpath, mock_decrypt, session
+):
+    """asset_store injection is skipped for plain BaseTrigger 
(non-BaseEventTrigger) instances."""
+    from airflow.triggers.testing import SuccessTrigger
+
+    mock_get_classpath.return_value = SuccessTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=12, ti=None, 
classpath="airflow.triggers.testing.SuccessTrigger", encrypted_kwargs="fake"
+        )
+    )
+
+    await runner.create_triggers()
+
+    assert 12 in runner.triggers
+    assert not hasattr(runner.triggers[12]["task"], "asset_store")
+
+    runner.triggers[12]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs", 
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def 
test_create_triggers_asset_store_contains_correct_assets(mock_get_classpath, 
mock_decrypt, session):
+    """AssetStoreAccessors built from watched_assets has entries for all 
provided name/URI pairs."""
+    from airflow.sdk.definitions.asset import Asset
+    from airflow.sdk.execution_time.context import AssetStoreAccessors
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent

Review Comment:
   Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to