pierrejeambrun commented on code in PR #29513:
URL: https://github.com/apache/airflow/pull/29513#discussion_r1109194909
##########
airflow/models/taskinstance.py:
##########
@@ -1270,23 +1333,24 @@ def check_and_change_state_before_execution(
ignore_task_deps=ignore_task_deps,
description="non-requeueable deps",
)
- if not self.are_dependencies_met(
+ if not ti.are_dependencies_met(
dep_context=non_requeueable_dep_context, session=session,
verbose=True
):
session.commit()
- return False
+ make_transient(ti)
Review Comment:
Task instance is expired if the session was not passed down from the parent
caller but provided by `@provide_session`. We need to `make_transient` to be
able to do:
```
ti = TI.check_and_change_state_before_execution()
```
We have only 1 case where there is no session in the parent,
`LocalTaskJob._execute`.
##########
airflow/models/taskinstance.py:
##########
@@ -1207,9 +1252,15 @@ def get_dagrun(self, session: Session = NEW_SESSION) ->
DagRun:
return dr
+ @staticmethod
+ @internal_api_call
@provide_session
def check_and_change_state_before_execution(
- self,
+ dag_id: str,
Review Comment:
Primary key for task instance is composite with `dag_id`, `run_id`,
`task_id`, `map_index`. So the call is a little bit more verbose
##########
airflow/models/taskinstance.py:
##########
@@ -1237,25 +1293,32 @@ def check_and_change_state_before_execution(
:param mark_success: Don't run the task, mark its state as success
:param test_mode: Doesn't record success or failure in the DB
:param job_id: Job (BackfillJob / LocalTaskJob / SchedulerJob) ID
- :param pool: specifies the pool to use to run the task instance
+ :param pool: Specifies the pool to use to run the task instance
:param external_executor_id: The identifier of the celery executor
:param session: SQLAlchemy ORM Session
:return: whether the state was changed to running or not
"""
- task = self.task
- self.refresh_from_task(task, pool_override=pool)
- self.test_mode = test_mode
- self.refresh_from_db(session=session, lock_for_update=True)
- self.job_id = job_id
- self.hostname = get_hostname()
- self.pid = None
+ ti = TaskInstance.retrieve_from_db(
+ dag_id, run_id, task_id, map_index, session=session,
lock_for_update=True
+ )
+ if ti is None:
Review Comment:
Sometime this function is called but there is not yet a db record for the
task instance. cf `test_check_and_change_state_before_execution_dep_not_met`
which highlight this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]