uranusjr commented on a change in pull request #18724:
URL: https://github.com/apache/airflow/pull/18724#discussion_r774963161
##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -309,15 +308,34 @@ def post_set_task_instances_state(*, dag_id: str,
session: Session = NEW_SESSION
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)
- execution_date = data['execution_date']
- try:
- session.query(TI).filter_by(execution_date=execution_date,
task_id=task_id, dag_id=dag_id).one()
- except NoResultFound:
- raise NotFound(f"Task instance not found for task {task_id} on
execution_date {execution_date}")
+ execution_date = data.get('execution_date')
+ run_id = data.get('dag_run_id')
+
+ if run_id is not None:
+ ti = (
+ session.query(func.count(TI.dag_id))
+ .filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.run_id ==
run_id)
+ .scalar()
+ )
+ if not ti:
+ raise NotFound(
+ detail=f"Task instance not found for task {task_id!r} on DAG
run with ID {run_id!r}"
+ )
+ elif execution_date is not None:
+ ti = (
+ session.query(TI.run_id)
+ .filter(TI.execution_date == execution_date, TI.task_id ==
task_id, TI.dag_id == dag_id)
+ .first()
+ )
+ if not ti:
+ raise NotFound(
+ detail=f"Task instance not found for task {task_id!r} on
execution_date {execution_date}"
+ )
+ run_id = ti.run_id
Review comment:
Can (should?) this backfilling run_id from execution_date be done in the
schema class?
##########
File path: airflow/models/dagrun.py
##########
@@ -296,9 +297,9 @@ def next_dagruns_to_examine(
def find(
cls,
dag_id: Optional[Union[str, List[str]]] = None,
- run_id: Optional[str] = None,
- execution_date: Optional[Union[datetime, List[datetime]]] = None,
- state: Optional[Union[str, DagRunState]] = None,
+ run_id: Optional[Iterable] = None,
+ execution_date: Optional[Union[datetime, Iterable]] = None,
Review comment:
```suggestion
run_id: Optional[Iterable[str]] = None,
execution_date: Optional[Union[datetime, Iterable[datetime]]] = None,
```
##########
File path: airflow/models/dag.py
##########
@@ -1661,14 +1665,22 @@ def set_task_instance_state(
:param past: Include all past TaskInstances of the given task_id
:type past: bool
"""
- from airflow.api.common.experimental.mark_tasks import set_state
+ from airflow.api.common.mark_tasks import set_state
+
+ if not (execution_date is None) ^ (dag_run_id is None):
Review comment:
Let’s use the `exactly_one` util added recently.
##########
File path: tests/conftest.py
##########
@@ -533,7 +533,7 @@ def create_dagrun(self, **kwargs):
if "run_type" not in kwargs:
kwargs["run_type"] = DagRunType.from_run_id(kwargs["run_id"])
- if "execution_date" not in kwargs:
+ if "execution_date" not in kwargs or kwargs["execution_date"] is
None:
Review comment:
Maybe
```suggestion
if kwargs.get("execution_date") is None:
```
##########
File path: airflow/models/dag.py
##########
@@ -1690,6 +1702,9 @@ def set_task_instance_state(
include_upstream=False,
)
+ if dag_run_id:
+ execution_date = dag_run.execution_date
Review comment:
Should this be merged into the `if dag_run_id` block above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]