kaxil commented on code in PR #67839:
URL: https://github.com/apache/airflow/pull/67839#discussion_r3439491208


##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -561,6 +571,255 @@ def 
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
     assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
 
 
+def 
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
 mocker):
+    """_create_workload() should populate watched_assets when 
trigger.task_instance is None and assets exist."""
+    asset1 = mocker.Mock(spec=Asset)
+    asset1.name = "my_asset"
+    asset1.uri = "s3://bucket/key"
+
+    asset2 = mocker.Mock(spec=Asset)
+    asset2.name = "other_asset"
+    asset2.uri = "gs://bucket/path"
+
+    trigger = mocker.Mock(spec=BaseEventTrigger)
+    trigger.id = 42
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None  # Not tied to a Task (similar to a 
BaseEventTrigger)
+    trigger.assets = [asset1, asset2]
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets == {"my_asset": "s3://bucket/key", 
"other_asset": "gs://bucket/path"}
+
+
+def 
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor, 
mocker):
+    """_create_workload() should set watched_assets=None when 
trigger.task_instance is None and assets is empty."""
+    trigger = mocker.Mock(spec=BaseEventTrigger)
+    trigger.id = 43
+    trigger.classpath = "some.path.Trigger"
+    trigger.encrypted_kwargs = "encrypted"
+    trigger.task_instance = None
+    trigger.assets = []  # No Assets are attached to the trigger
+
+    workload = jobless_supervisor._create_workload(
+        trigger=trigger,
+        dag_bag=mocker.Mock(),
+        render_log_fname=mocker.Mock(),
+        session=mocker.Mock(),
+    )
+
+    assert workload is not None
+    assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+    """RunTrigger workload should accept and store watched_assets."""
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+        watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+    )
+    assert workload.watched_assets == {"asset_a": "s3://a", "asset_b": 
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+    """RunTrigger workload watched_assets should default to None."""
+    workload = RunTrigger(
+        id=1,
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        encrypted_kwargs="fake",
+    )
+    assert workload.watched_assets is None
+
+
[email protected]
+def make_watcher_trigger():
+    """Factory fixture: call with a list to get a BaseEventTrigger subclass 
that appends each new instance."""
+
+    def factory(injected_instances):
+        class WatcherTrigger(BaseEventTrigger):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)
+                injected_instances.append(self)
+
+            def serialize(self):
+                return (f"{type(self).__module__}.{type(self).__qualname__}", 
{})
+
+            async def run(self):
+                yield TriggerEvent("done")
+
+        return WatcherTrigger
+
+    return factory
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def 
test_create_triggers_injects_asset_state_store_for_base_event_trigger(
+    mock_get_classpath, session, make_watcher_trigger
+):
+    """asset_state_store is populated on BaseEventTrigger instances when 
watched_assets is set."""
+    injected_instances = []
+    mock_get_classpath.return_value = make_watcher_trigger(injected_instances)
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=10,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="{}",
+            watched_assets={"my_asset": "s3://bucket/key"},
+        )
+    )
+
+    await runner.create_triggers()
+
+    # This is only testing that an exception was NOT thrown when creating the 
Trigger
+    assert 10 in runner.triggers
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_state_store is not None
+    assert isinstance(injected_instances[0].asset_state_store, 
AssetStateStoreAccessors)
+
+    runner.triggers[10]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_state_store_none_when_no_watched_assets(
+    mock_get_classpath, session, make_watcher_trigger
+):
+    """asset_state_store stays None when watched_assets is not set on the 
workload."""
+    injected_instances = []
+    mock_get_classpath.return_value = make_watcher_trigger(injected_instances)
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=11,
+            ti=None,
+            classpath="fake.WatcherTrigger",
+            encrypted_kwargs="{}",
+            watched_assets=None,
+        )
+    )
+
+    await runner.create_triggers()
+
+    assert len(injected_instances) == 1
+    assert injected_instances[0].asset_state_store is None
+
+    runner.triggers[11]["task"].cancel()
+    await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def 
test_create_triggers_skips_asset_state_store_for_non_event_trigger(mock_get_classpath,
 session):
+    """asset_state_store injection is skipped for plain BaseTrigger 
(non-BaseEventTrigger) instances."""
+    mock_get_classpath.return_value = SuccessTrigger
+
+    runner = TriggerRunner()
+    runner.to_create.append(
+        workloads.RunTrigger.model_construct(
+            id=12, ti=None, 
classpath="airflow.triggers.testing.SuccessTrigger", encrypted_kwargs="{}"
+        )
+    )
+
+    await runner.create_triggers()
+
+    assert 12 in runner.triggers
+    assert not hasattr(runner.triggers[12]["task"], "asset_state_store")

Review Comment:
   `runner.triggers[12]["task"]` is the asyncio `Task`, not the trigger 
instance, so this assertion is vacuously true: a `Task` never has an 
`asset_state_store` attribute regardless of the skip logic. To actually verify 
the non-event-trigger path is skipped, capture the trigger instance (like the 
`make_watcher_trigger` fixture does for the other tests) and assert `not 
hasattr(instance, "asset_state_store")` on it.
   



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1313,6 +1321,11 @@ async def create_triggers(self):
             trigger_instance.triggerer_job_id = self.job_id
             trigger_instance.timeout_after = workload.timeout_after
 
+            if isinstance(trigger_instance, BaseEventTrigger) and 
workload.watched_assets:

Review Comment:
   Following up on the `_handle_request` thread above (marked resolved, but I 
don't think the runtime gap was actually closed out): the accessor injected 
here can't read or write at runtime. `AssetStateStoreAccessor.get()`/`.set()` 
send `GetAssetStateStoreByName`/`SetAssetStateStoreByName` over 
`SUPERVISOR_COMMS`, but the triggerer supervisor's `_handle_request` (line 543) 
has no branch for those message types and `ToTriggerSupervisor` (line 367) 
doesn't include them, so a real `self.asset_state_store[asset].get("key")` call 
lands on the `else: raise ValueError(f"Unknown message type ...")` fallthrough 
(line 626).
   
   `test_create_triggers_asset_state_store_accessor_reads_and_writes` only 
mocks `SUPERVISOR_COMMS.send` and asserts the outgoing message, so it never 
exercises the supervisor side and stays green while the round trip is broken.
   
   Was the get/set wiring meant to land in a follow-up? If so, worth calling 
out in the description that the accessor is injected but not yet functional. 
(Separately, these `send()` calls are synchronous inside `async def run()`, 
which the `init_comms` docstring at line 1229 flags as needing a 
`sync_to_async()` wrapper.)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to