uranusjr commented on a change in pull request #18724:
URL: https://github.com/apache/airflow/pull/18724#discussion_r753907424
##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -297,15 +297,33 @@ def post_set_task_instances_state(dag_id, session):
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)
- execution_date = data['execution_date']
- try:
- session.query(TI).filter_by(execution_date=execution_date,
task_id=task_id, dag_id=dag_id).one()
- except NoResultFound:
- raise NotFound(f"Task instance not found for task {task_id} on
execution_date {execution_date}")
+ execution_date = data.get('execution_date')
+ run_id = data.get('dag_run_id')
+ if not run_id and not execution_date:
+ raise BadRequest(detail="You must provide either the dag_run_id or the
execution_date")
+ if run_id and execution_date:
+ raise BadRequest(detail="You cannot provide both the dag_run_id and
the execution_date")
Review comment:
I think this can be better handled in the schema
(`set_task_instance_state_form`) instead. There are existing examples for this.
##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -297,15 +297,33 @@ def post_set_task_instances_state(dag_id, session):
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)
- execution_date = data['execution_date']
- try:
- session.query(TI).filter_by(execution_date=execution_date,
task_id=task_id, dag_id=dag_id).one()
- except NoResultFound:
- raise NotFound(f"Task instance not found for task {task_id} on
execution_date {execution_date}")
+ execution_date = data.get('execution_date')
+ run_id = data.get('dag_run_id')
+ if not run_id and not execution_date:
+ raise BadRequest(detail="You must provide either the dag_run_id or the
execution_date")
+ if run_id and execution_date:
+ raise BadRequest(detail="You cannot provide both the dag_run_id and
the execution_date")
+ if run_id and not execution_date:
+ try:
+ session.query(TI).join(TI.dag_run).filter(TI.task_id == task_id,
TI.dag_id == dag_id).filter(
+ DR.run_id == run_id
+ ).one()
+ except NoResultFound:
+ raise NotFound(detail=f"The taskinstance with dag_run_id: {run_id}
not found")
Review comment:
```suggestion
raise NotFound(detail=f"Task instance not found for task
{task_id} on DAG run with ID {run_id}")
```
to better match the other message.
##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -297,15 +297,33 @@ def post_set_task_instances_state(dag_id, session):
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)
- execution_date = data['execution_date']
- try:
- session.query(TI).filter_by(execution_date=execution_date,
task_id=task_id, dag_id=dag_id).one()
- except NoResultFound:
- raise NotFound(f"Task instance not found for task {task_id} on
execution_date {execution_date}")
+ execution_date = data.get('execution_date')
+ run_id = data.get('dag_run_id')
+ if not run_id and not execution_date:
+ raise BadRequest(detail="You must provide either the dag_run_id or the
execution_date")
+ if run_id and execution_date:
+ raise BadRequest(detail="You cannot provide both the dag_run_id and
the execution_date")
+ if run_id and not execution_date:
+ try:
+ session.query(TI).join(TI.dag_run).filter(TI.task_id == task_id,
TI.dag_id == dag_id).filter(
+ DR.run_id == run_id
+ ).one()
Review comment:
Although as a related improvement, it is better to use `count()` for
both of these checks IMO.
##########
File path: airflow/api_connexion/endpoints/task_instance_endpoint.py
##########
@@ -297,15 +297,33 @@ def post_set_task_instances_state(dag_id, session):
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)
- execution_date = data['execution_date']
- try:
- session.query(TI).filter_by(execution_date=execution_date,
task_id=task_id, dag_id=dag_id).one()
- except NoResultFound:
- raise NotFound(f"Task instance not found for task {task_id} on
execution_date {execution_date}")
+ execution_date = data.get('execution_date')
+ run_id = data.get('dag_run_id')
+ if not run_id and not execution_date:
+ raise BadRequest(detail="You must provide either the dag_run_id or the
execution_date")
+ if run_id and execution_date:
+ raise BadRequest(detail="You cannot provide both the dag_run_id and
the execution_date")
+ if run_id and not execution_date:
+ try:
+ session.query(TI).join(TI.dag_run).filter(TI.task_id == task_id,
TI.dag_id == dag_id).filter(
+ DR.run_id == run_id
+ ).one()
Review comment:
```suggestion
session.query(TI).join(TI.dag_run).filter(
TI.task_id == task_id,
TI.dag_id == dag_id,
DR.run_id == run_id,
).one()
```
##########
File path: airflow/models/dag.py
##########
@@ -1619,10 +1621,12 @@ def set_task_instance_state(
:param task_id: Task ID of the TaskInstance
:type task_id: str
- :param execution_date: execution_date of the TaskInstance
- :type execution_date: datetime
+ :param execution_date: Optional execution_date of the TaskInstance
+ :type execution_date: Optional[datetime]
:param state: State to set the TaskInstance to
:type state: State
+ :param run_id: Optional run_id for the TaskInstance
+ :type run_id: Optional[str]
Review comment:
Same as above, we should either document or check the arguments.
##########
File path: airflow/api/common/experimental/mark_tasks.py
##########
@@ -62,8 +62,10 @@ def _create_dagruns(dag, execution_dates, state, run_type):
@provide_session
def set_state(
+ *,
tasks: Iterable[BaseOperator],
- execution_date: datetime.datetime,
+ execution_date: Optional[datetime.datetime] = None,
+ run_id: Optional[str] = None,
Review comment:
Should we check only _exactly one_ of `run_id` and `execution_date` are
passed to this function? If not, we should mention in the docstring this
function expects the _caller_ to check first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]