jroachgolf84 commented on code in PR #67839:
URL: https://github.com/apache/airflow/pull/67839#discussion_r3343916715
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
Review Comment:
I don't think so? What would that implementation look like?
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1266,6 +1269,16 @@ async def create_triggers(self):
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.timeout_after = workload.timeout_after
+ if isinstance(trigger_instance, BaseEventTrigger) and
workload.watched_assets:
+ # Reconstruct AssetStateAccessors from watched_assets
+ from airflow.sdk.definitions.asset import Asset
+ from airflow.sdk.execution_time.context import
AssetStateAccessors
Review Comment:
Hmmm, not seeing that.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1286,6 +1293,16 @@ async def create_triggers(self):
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.timeout_after = workload.timeout_after
+ if isinstance(trigger_instance, BaseEventTrigger) and
workload.watched_assets:
+ # Reconstruct AssetStateAccessors from watched_assets
+ from airflow.sdk.definitions.asset import Asset
+ from airflow.sdk.execution_time.context import
AssetStoreAccessors
Review Comment:
Moved to top-level.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -741,11 +737,18 @@ def _create_workload(
render_log_fname: Callable[..., str],
session: Session,
) -> workloads.RunTrigger | None:
+ # Pass the "watched" Assets through for downstream use in
BaseEventTrigger
if trigger.task_instance is None:
+ watched_assets: dict[str, str] | None = None
+
+ if trigger.asset_watchers:
+ watched_assets = {a.name: a.uri for a in trigger.assets}
Review Comment:
Resolved in next commit.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
+def
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
mocker):
Review Comment:
Implemented for tests in this PR.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1286,6 +1293,16 @@ async def create_triggers(self):
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.timeout_after = workload.timeout_after
+ if isinstance(trigger_instance, BaseEventTrigger) and
workload.watched_assets:
+ # Reconstruct AssetStateAccessors from watched_assets
+ from airflow.sdk.definitions.asset import Asset
+ from airflow.sdk.execution_time.context import
AssetStoreAccessors
+
+ # Potentially address Asset vs. AssetRef, AssetUriRef, etc.
Review Comment:
No, removing comment.
##########
airflow-core/tests/unit/triggers/test_base_trigger.py:
##########
@@ -232,3 +234,25 @@ async def stream():
payloads = [event.payload async for event in
us.filter_shared_stream(stream())]
assert [p["region"] for p in payloads] == ["us", "us"]
+
+
+def test_base_event_trigger_asset_state_store_initialized_to_none():
+ """asset_state_store is None before it is set."""
+ trigger = _PlainEventTrigger()
+ assert trigger.asset_state_store is None
+
+
+def test_base_event_trigger_asset_state_store_can_be_set():
+ """asset_state_store can be set once the Trigger is initialized."""
+ trigger = _PlainEventTrigger()
+ mock_store = MagicMock()
+ trigger.asset_state_store = mock_store
+ assert trigger.asset_state_store is mock_store
+
+
+def test_base_event_trigger_asset_state_store_independent_across_instances():
+ """a.asset_state_store does not impact b.asset_state_store."""
+ a = _PlainEventTrigger(name="a")
+ b = _PlainEventTrigger(name="b")
+ a.asset_state_store = MagicMock()
+ assert b.asset_state_store is None
Review Comment:
Resolved in next commit.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
+def
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
mocker):
+ """_create_workload() should populate watched_assets when
trigger.task_instance is None and assets exist."""
+ asset1 = mocker.Mock()
+ asset1.name = "my_asset"
+ asset1.uri = "s3://bucket/key"
+
+ asset2 = mocker.Mock()
+ asset2.name = "other_asset"
+ asset2.uri = "gs://bucket/path"
+
+ trigger = mocker.Mock()
+ trigger.id = 42
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None # Not tied to a Task (similar to a
BaseEventTrigger)
+ trigger.assets = [asset1, asset2]
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets == {"my_asset": "s3://bucket/key",
"other_asset": "gs://bucket/path"}
+
+
+def
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor,
mocker):
+ """_create_workload() should set watched_assets=None when
trigger.task_instance is None and assets is empty."""
+ trigger = mocker.Mock()
+ trigger.id = 43
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None
+ trigger.assets = [] # No Assets are attached to the trigger
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+ """RunTrigger workload should accept and store watched_assets."""
+ from airflow.executors.workloads.trigger import RunTrigger
+
+ workload = RunTrigger(
+ id=1,
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ encrypted_kwargs="fake",
+ watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+ )
+ assert workload.watched_assets == {"asset_a": "s3://a", "asset_b":
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+ """RunTrigger workload watched_assets should default to None."""
+ from airflow.executors.workloads.trigger import RunTrigger
+
+ workload = RunTrigger(
+ id=1,
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ encrypted_kwargs="fake",
+ )
+ assert workload.watched_assets is None
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_injects_asset_store_for_base_event_trigger(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store is populated on BaseEventTrigger instances when
watched_assets is set."""
+ from airflow.sdk.execution_time.context import AssetStoreAccessors
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+ injected_instances = []
+
+ class _WatcherTrigger(BaseEventTrigger):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ injected_instances.append(self)
+
+ def serialize(self):
+ return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+ async def run(self):
+ yield TriggerEvent("done")
+
+ mock_get_classpath.return_value = _WatcherTrigger
+
+ runner = TriggerRunner()
+ runner.to_create.append(
+ workloads.RunTrigger.model_construct(
+ id=10,
+ ti=None,
+ classpath="fake.WatcherTrigger",
+ encrypted_kwargs="fake",
+ watched_assets={"my_asset": "s3://bucket/key"},
+ )
+ )
+
+ await runner.create_triggers()
+
+ # This is only testing that an exception was NOT thrown when creating the
Trigger
+ assert 10 in runner.triggers
+
+ assert len(injected_instances) == 1
+ assert injected_instances[0].asset_store is not None
+ assert isinstance(injected_instances[0].asset_store, AssetStoreAccessors)
+
+ runner.triggers[10]["task"].cancel()
+ await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_store_none_when_no_watched_assets(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store stays None when watched_assets is not set on the
workload."""
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+ injected_instances = []
+
+ class _WatcherTrigger(BaseEventTrigger):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ injected_instances.append(self)
+
+ def serialize(self):
+ return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+ async def run(self):
+ yield TriggerEvent("done")
+
+ mock_get_classpath.return_value = _WatcherTrigger
+
+ runner = TriggerRunner()
+ runner.to_create.append(
+ workloads.RunTrigger.model_construct(
+ id=11,
+ ti=None,
+ classpath="fake.WatcherTrigger",
+ encrypted_kwargs="fake",
+ watched_assets=None,
+ )
+ )
+
+ await runner.create_triggers()
+
+ assert len(injected_instances) == 1
+ assert injected_instances[0].asset_store is None
+
+ runner.triggers[11]["task"].cancel()
+ await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_skips_asset_store_for_non_event_trigger(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store injection is skipped for plain BaseTrigger
(non-BaseEventTrigger) instances."""
+ from airflow.triggers.testing import SuccessTrigger
Review Comment:
Moved.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1286,6 +1293,16 @@ async def create_triggers(self):
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.timeout_after = workload.timeout_after
+ if isinstance(trigger_instance, BaseEventTrigger) and
workload.watched_assets:
+ # Reconstruct AssetStateAccessors from watched_assets
Review Comment:
Removed.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
+def
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
mocker):
Review Comment:
Added test!
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
+def
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
mocker):
+ """_create_workload() should populate watched_assets when
trigger.task_instance is None and assets exist."""
+ asset1 = mocker.Mock()
+ asset1.name = "my_asset"
+ asset1.uri = "s3://bucket/key"
+
+ asset2 = mocker.Mock()
+ asset2.name = "other_asset"
+ asset2.uri = "gs://bucket/path"
+
+ trigger = mocker.Mock()
+ trigger.id = 42
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None # Not tied to a Task (similar to a
BaseEventTrigger)
+ trigger.assets = [asset1, asset2]
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets == {"my_asset": "s3://bucket/key",
"other_asset": "gs://bucket/path"}
+
+
+def
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor,
mocker):
+ """_create_workload() should set watched_assets=None when
trigger.task_instance is None and assets is empty."""
+ trigger = mocker.Mock()
+ trigger.id = 43
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None
+ trigger.assets = [] # No Assets are attached to the trigger
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+ """RunTrigger workload should accept and store watched_assets."""
+ from airflow.executors.workloads.trigger import RunTrigger
Review Comment:
Moved.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
+def
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
mocker):
+ """_create_workload() should populate watched_assets when
trigger.task_instance is None and assets exist."""
+ asset1 = mocker.Mock()
+ asset1.name = "my_asset"
+ asset1.uri = "s3://bucket/key"
+
+ asset2 = mocker.Mock()
+ asset2.name = "other_asset"
+ asset2.uri = "gs://bucket/path"
+
+ trigger = mocker.Mock()
+ trigger.id = 42
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None # Not tied to a Task (similar to a
BaseEventTrigger)
+ trigger.assets = [asset1, asset2]
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets == {"my_asset": "s3://bucket/key",
"other_asset": "gs://bucket/path"}
+
+
+def
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor,
mocker):
+ """_create_workload() should set watched_assets=None when
trigger.task_instance is None and assets is empty."""
+ trigger = mocker.Mock()
+ trigger.id = 43
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None
+ trigger.assets = [] # No Assets are attached to the trigger
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+ """RunTrigger workload should accept and store watched_assets."""
+ from airflow.executors.workloads.trigger import RunTrigger
+
+ workload = RunTrigger(
+ id=1,
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ encrypted_kwargs="fake",
+ watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+ )
+ assert workload.watched_assets == {"asset_a": "s3://a", "asset_b":
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+ """RunTrigger workload watched_assets should default to None."""
+ from airflow.executors.workloads.trigger import RunTrigger
+
+ workload = RunTrigger(
+ id=1,
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ encrypted_kwargs="fake",
+ )
+ assert workload.watched_assets is None
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_injects_asset_store_for_base_event_trigger(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store is populated on BaseEventTrigger instances when
watched_assets is set."""
+ from airflow.sdk.execution_time.context import AssetStoreAccessors
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+ injected_instances = []
+
+ class _WatcherTrigger(BaseEventTrigger):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ injected_instances.append(self)
+
+ def serialize(self):
+ return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+ async def run(self):
+ yield TriggerEvent("done")
+
+ mock_get_classpath.return_value = _WatcherTrigger
+
+ runner = TriggerRunner()
+ runner.to_create.append(
+ workloads.RunTrigger.model_construct(
+ id=10,
+ ti=None,
+ classpath="fake.WatcherTrigger",
+ encrypted_kwargs="fake",
+ watched_assets={"my_asset": "s3://bucket/key"},
+ )
+ )
+
+ await runner.create_triggers()
+
+ # This is only testing that an exception was NOT thrown when creating the
Trigger
+ assert 10 in runner.triggers
+
+ assert len(injected_instances) == 1
+ assert injected_instances[0].asset_store is not None
+ assert isinstance(injected_instances[0].asset_store, AssetStoreAccessors)
+
+ runner.triggers[10]["task"].cancel()
+ await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_store_none_when_no_watched_assets(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store stays None when watched_assets is not set on the
workload."""
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
Review Comment:
Moved.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -558,6 +558,248 @@ def
test_create_workload_uses_supervisor_id_without_job(jobless_supervisor, mock
assert factory.log_path == f"/logs/ti.trigger.{jobless_supervisor.id}.log"
+def
test_create_workload_sets_watched_assets_for_asset_only_trigger(jobless_supervisor,
mocker):
+ """_create_workload() should populate watched_assets when
trigger.task_instance is None and assets exist."""
+ asset1 = mocker.Mock()
+ asset1.name = "my_asset"
+ asset1.uri = "s3://bucket/key"
+
+ asset2 = mocker.Mock()
+ asset2.name = "other_asset"
+ asset2.uri = "gs://bucket/path"
+
+ trigger = mocker.Mock()
+ trigger.id = 42
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None # Not tied to a Task (similar to a
BaseEventTrigger)
+ trigger.assets = [asset1, asset2]
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets == {"my_asset": "s3://bucket/key",
"other_asset": "gs://bucket/path"}
+
+
+def
test_create_workload_watched_assets_none_when_no_assets(jobless_supervisor,
mocker):
+ """_create_workload() should set watched_assets=None when
trigger.task_instance is None and assets is empty."""
+ trigger = mocker.Mock()
+ trigger.id = 43
+ trigger.classpath = "some.path.Trigger"
+ trigger.encrypted_kwargs = "encrypted"
+ trigger.task_instance = None
+ trigger.assets = [] # No Assets are attached to the trigger
+
+ workload = jobless_supervisor._create_workload(
+ trigger=trigger,
+ dag_bag=mocker.Mock(),
+ render_log_fname=mocker.Mock(),
+ session=mocker.Mock(),
+ )
+
+ assert workload is not None
+ assert workload.watched_assets is None
+
+
+def test_run_trigger_workload_includes_watched_assets_field():
+ """RunTrigger workload should accept and store watched_assets."""
+ from airflow.executors.workloads.trigger import RunTrigger
+
+ workload = RunTrigger(
+ id=1,
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ encrypted_kwargs="fake",
+ watched_assets={"asset_a": "s3://a", "asset_b": "gs://b"},
+ )
+ assert workload.watched_assets == {"asset_a": "s3://a", "asset_b":
"gs://b"}
+
+
+def test_run_trigger_workload_watched_assets_defaults_to_none():
+ """RunTrigger workload watched_assets should default to None."""
+ from airflow.executors.workloads.trigger import RunTrigger
+
+ workload = RunTrigger(
+ id=1,
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ encrypted_kwargs="fake",
+ )
+ assert workload.watched_assets is None
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_injects_asset_store_for_base_event_trigger(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store is populated on BaseEventTrigger instances when
watched_assets is set."""
+ from airflow.sdk.execution_time.context import AssetStoreAccessors
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+ injected_instances = []
+
+ class _WatcherTrigger(BaseEventTrigger):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ injected_instances.append(self)
+
+ def serialize(self):
+ return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+ async def run(self):
+ yield TriggerEvent("done")
+
+ mock_get_classpath.return_value = _WatcherTrigger
+
+ runner = TriggerRunner()
+ runner.to_create.append(
+ workloads.RunTrigger.model_construct(
+ id=10,
+ ti=None,
+ classpath="fake.WatcherTrigger",
+ encrypted_kwargs="fake",
+ watched_assets={"my_asset": "s3://bucket/key"},
+ )
+ )
+
+ await runner.create_triggers()
+
+ # This is only testing that an exception was NOT thrown when creating the
Trigger
+ assert 10 in runner.triggers
+
+ assert len(injected_instances) == 1
+ assert injected_instances[0].asset_store is not None
+ assert isinstance(injected_instances[0].asset_store, AssetStoreAccessors)
+
+ runner.triggers[10]["task"].cancel()
+ await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_asset_store_none_when_no_watched_assets(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store stays None when watched_assets is not set on the
workload."""
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+ injected_instances = []
+
+ class _WatcherTrigger(BaseEventTrigger):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ injected_instances.append(self)
+
+ def serialize(self):
+ return (f"{type(self).__module__}.{type(self).__qualname__}", {})
+
+ async def run(self):
+ yield TriggerEvent("done")
+
+ mock_get_classpath.return_value = _WatcherTrigger
+
+ runner = TriggerRunner()
+ runner.to_create.append(
+ workloads.RunTrigger.model_construct(
+ id=11,
+ ti=None,
+ classpath="fake.WatcherTrigger",
+ encrypted_kwargs="fake",
+ watched_assets=None,
+ )
+ )
+
+ await runner.create_triggers()
+
+ assert len(injected_instances) == 1
+ assert injected_instances[0].asset_store is None
+
+ runner.triggers[11]["task"].cancel()
+ await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def test_create_triggers_skips_asset_store_for_non_event_trigger(
+ mock_get_classpath, mock_decrypt, session
+):
+ """asset_store injection is skipped for plain BaseTrigger
(non-BaseEventTrigger) instances."""
+ from airflow.triggers.testing import SuccessTrigger
+
+ mock_get_classpath.return_value = SuccessTrigger
+
+ runner = TriggerRunner()
+ runner.to_create.append(
+ workloads.RunTrigger.model_construct(
+ id=12, ti=None,
classpath="airflow.triggers.testing.SuccessTrigger", encrypted_kwargs="fake"
+ )
+ )
+
+ await runner.create_triggers()
+
+ assert 12 in runner.triggers
+ assert not hasattr(runner.triggers[12]["task"], "asset_store")
+
+ runner.triggers[12]["task"].cancel()
+ await runner.cleanup_finished_triggers()
+
+
[email protected]
+@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs",
return_value={})
+@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.get_trigger_by_classpath")
+async def
test_create_triggers_asset_store_contains_correct_assets(mock_get_classpath,
mock_decrypt, session):
+ """AssetStoreAccessors built from watched_assets has entries for all
provided name/URI pairs."""
+ from airflow.sdk.definitions.asset import Asset
+ from airflow.sdk.execution_time.context import AssetStoreAccessors
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
Review Comment:
Moved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]