ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run
skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380119916
##########
File path: airflow/models/skipmixin.py
##########
@@ -65,13 +61,44 @@ def skip(self, dag_run, execution_date, tasks,
session=None):
ti.end_date = now
session.merge(ti)
- session.commit()
+ @provide_session
+ def skip(
+ self,
+ dag_run,
+ execution_date,
+ tasks,
+ session=None,
+ ti: Optional[TaskInstance] = None,
+ ):
+ """
+ Sets tasks instances to skipped from the same dag run. If ti is given,
store the list of
+ skipped task IDs to XCom so that NotPreviouslySkippedDep knows these
tasks should be skipped
+ when they are cleared.
+
+ :param dag_run: the DagRun for which to set the tasks to skipped
+ :param execution_date: execution_date
+ :param tasks: tasks to skip (not task_ids)
+ :param session: db session to use
+ :param ti: The TaskInstance that initiates the skip. Used for storing
results to XCom.
+ If not given, skipped task IDs will not be stored to XCom.
+ """
+ if not tasks:
+ return
+
+ self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+ session.commit()
+ if ti:
Review comment:
We can avoid needing a TI here, and thus not have to change the fn sigature
by calling `XCom.set` directly:
```
XCom.set(
key=XCOM_SKIPMIXIN_KEY,
value={"skipped": [d.task_id for d in tasks],
execution_date=execution_date,
task_id=self.task_id,
dag_id=dag_run.dag_id,
session=session
)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services