amoghrajesh commented on code in PR #67839:
URL: https://github.com/apache/airflow/pull/67839#discussion_r3338985312
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
Review Comment:
The `_handle_request` should be handling this in here?
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1266,6 +1269,16 @@ async def create_triggers(self):
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.timeout_after = workload.timeout_after
+ if isinstance(trigger_instance, BaseEventTrigger) and
workload.watched_assets:
+ # Reconstruct AssetStateAccessors from watched_assets
+ from airflow.sdk.definitions.asset import Asset
+ from airflow.sdk.execution_time.context import
AssetStateAccessors
Review Comment:
Already a top level import
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -741,11 +737,18 @@ def _create_workload(
render_log_fname: Callable[..., str],
session: Session,
) -> workloads.RunTrigger | None:
+ # Pass the "watched" Assets through for downstream use in
BaseEventTrigger
if trigger.task_instance is None:
+ watched_assets: dict[str, str] | None = None
+
+ if trigger.asset_watchers:
+ watched_assets = {a.name: a.uri for a in trigger.assets}
Review Comment:
Either use trigger.assets or trigger.asset_watchers for if check and
iteration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]