ashb commented on a change in pull request #18724:
URL: https://github.com/apache/airflow/pull/18724#discussion_r722122438
##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -287,11 +287,38 @@ def post_set_task_instances_state(dag_id, session):
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)
- execution_date = data['execution_date']
+ execution_date = data.get('execution_date')
+ run_id = data.get('dag_run_id')
+ if not run_id and not execution_date:
+ raise BadRequest(detail="You must provide either the dag_run_id or the
execution_date")
+ if run_id and execution_date:
+ try:
+ session.query(TI).join(TI.dag_run).filter(
+ TI.execution_date == execution_date, TI.task_id == task_id,
TI.dag_id == dag_id
+ ).filter(DR.run_id == run_id).one()
+ except NoResultFound:
+ raise NotFound(
+ detail=f"DagRun with run_id: {run_id} and execution_date:
{execution_date} not found"
+ )
+ if run_id and not execution_date:
+ try:
+ ti = (
+ session.query(TI)
+ .join(TI.dag_run)
+ .filter(TI.task_id == task_id, TI.dag_id == dag_id)
+ .filter(DR.run_id == run_id)
+ .one()
+ )
+ execution_date = ti.execution_date
Review comment:
Yeah, I think this should be reworked -- it seems a bit wasteful to load
up the dag run, get the execution date, "throw out" the dag run we loaded to
the load it again later!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]