dabla commented on code in PR #55068:
URL: https://github.com/apache/airflow/pull/55068#discussion_r2499007781
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -614,63 +658,46 @@ def update_triggers(self, requested_trigger_ids:
set[int]):
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids - known_trigger_ids
cancel_trigger_ids = self.running_triggers - requested_trigger_ids
- # Bulk-fetch new trigger records
- new_triggers = Trigger.bulk_fetch(new_trigger_ids)
- trigger_ids_with_non_task_associations =
Trigger.fetch_trigger_ids_with_non_task_associations()
- to_create: list[workloads.RunTrigger] = []
- # Add in new triggers
- for new_id in new_trigger_ids:
- # Check it didn't vanish in the meantime
- if new_id not in new_triggers:
- log.warning("Trigger disappeared before we could start it",
id=new_id)
- continue
- new_trigger_orm = new_triggers[new_id]
-
- # If the trigger is not associated to a task, an asset, or a
callback, this means the TaskInstance
- # row was updated by either Trigger.submit_event or
Trigger.submit_failure
- # and can happen when a single trigger Job is being run on
multiple TriggerRunners
- # in a High-Availability setup.
- if new_trigger_orm.task_instance is None and new_id not in
trigger_ids_with_non_task_associations:
- log.info(
- (
- "TaskInstance Trigger is None. It was likely updated
by another trigger job. "
- "Skipping trigger instantiation."
- ),
- id=new_id,
- )
- continue
-
- workload = workloads.RunTrigger(
- classpath=new_trigger_orm.classpath,
- id=new_id,
- encrypted_kwargs=new_trigger_orm.encrypted_kwargs,
- ti=None,
+ with create_session() as session:
+ # Bulk-fetch new trigger records
+ new_triggers = Trigger.bulk_fetch(new_trigger_ids, session=session)
+ trigger_ids_with_non_task_associations =
Trigger.fetch_trigger_ids_with_non_task_associations(
+ session=session
)
- if new_trigger_orm.task_instance:
- log_path = render_log_fname(ti=new_trigger_orm.task_instance)
- if not new_trigger_orm.task_instance.dag_version_id:
- # This is to handle 2 to 3 upgrade where TI.dag_version_id
can be none
- log.warning(
- "TaskInstance associated with Trigger has no
associated Dag Version, skipping the trigger",
- ti_id=new_trigger_orm.task_instance.id,
+ to_create: list[workloads.RunTrigger] = []
+ # Add in new triggers
+ for new_id in new_trigger_ids:
+ # Check it didn't vanish in the meantime
+ if new_id not in new_triggers:
+ log.warning("Trigger disappeared before we could start
it", id=new_id)
+ continue
+
+ new_trigger_orm = new_triggers[new_id]
+
+ # If the trigger is not associated to a task, an asset, or a
callback, this means the TaskInstance
+ # row was updated by either Trigger.submit_event or
Trigger.submit_failure
+ # and can happen when a single trigger Job is being run on
multiple TriggerRunners
+ # in a High-Availability setup.
+ if (
+ new_trigger_orm.task_instance is None
+ and new_id not in trigger_ids_with_non_task_associations
+ ):
+ log.info(
+ (
+ "TaskInstance Trigger is None. It was likely
updated by another trigger job. "
+ "Skipping trigger instantiation."
+ ),
+ id=new_id,
)
continue
- ser_ti = workloads.TaskInstance.model_validate(
- new_trigger_orm.task_instance, from_attributes=True
- )
- # When producing logs from TIs, include the job id producing
the logs to disambiguate it.
- self.logger_cache[new_id] = TriggerLoggingFactory(
- log_path=f"{log_path}.trigger.{self.job.id}.log",
- ti=ser_ti, # type: ignore
- )
- workload.ti = ser_ti
- workload.timeout_after =
new_trigger_orm.task_instance.trigger_timeout
+ workload = create_workload(new_trigger_orm, session=session)
- to_create.append(workload)
+ if workload:
+ to_create.append(workload)
Review Comment:
Applied it, good one!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]