ephraimbuddy commented on code in PR #23530:
URL: https://github.com/apache/airflow/pull/23530#discussion_r874967966
##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3581,6 +3619,31 @@ components:
description:
The value can be repeated to retrieve multiple matching values (OR
condition).
+ RunTaskInstanceForm:
+ type: object
+ required:
+ - ignore_all_deps
+ - ignore_task_deps
+ - ignore_ti_state
+ - map_index
Review Comment:
Should we have this on the fields themselves? Did you check how it appears
on documentation? Is map_index required or it should have a default
##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3581,6 +3619,31 @@ components:
description:
The value can be repeated to retrieve multiple matching values (OR
condition).
+ RunTaskInstanceForm:
+ type: object
+ required:
+ - ignore_all_deps
+ - ignore_task_deps
+ - ignore_ti_state
+ - map_index
+ properties:
+ ignore_all_deps:
+ type: boolean
+ description: |
+ Ignore task instance dependencies state during execution.
+ ignore_task_deps:
+ type: boolean
+ description: |
+ Ignore task instance dependencies state during execution.
Review Comment:
```suggestion
Ignore task dependencies state during execution.
```
##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -512,3 +517,84 @@ def post_set_task_instances_state(*, dag_id: str, session:
Session = NEW_SESSION
session=session,
)
return
task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
[email protected]_access(
+ [
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ],
+)
+@provide_session
+def run_task_instance(
+ *, dag_id: str, dag_run_id: str, task_id: str, session: Session =
NEW_SESSION
+) -> APIResponse:
+ """Set a state of task instances."""
+ body = request.get_json()
+ try:
+ data = run_task_instance_form.load(body)
+ except ValidationError as err:
+ raise BadRequest(detail=str(err.messages))
+
+ ignore_all_deps = data.get('ignore_all_deps')
+ ignore_task_deps = data.get('ignore_task_deps')
+ ignore_ti_state = data.get('ignore_ti_state')
+ map_index = data.get('map_index')
+
+ executor = ExecutorLoader.get_default_executor()
+ if not getattr(executor, "supports_ad_hoc_ti_run", False):
+ error_message = "Only works with the Celery, CeleryKubernetes or
Kubernetes executors"
+ raise BadRequest(detail=error_message)
+
+ error_message = f"Dag ID {dag_id} not found"
+ dag = current_app.dag_bag.get_dag(dag_id)
+ if not dag:
+ raise NotFound(error_message)
+
+ error_message = f"Task ID {task_id} not found"
+ task = dag.task_dict.get(task_id)
+ if not task:
+ raise NotFound(error_message)
+
+ dag_run = dag.get_dagrun(run_id=dag_run_id)
+ error_message = f"DagRun ID {dag_run_id} not found"
+ if not dag_run:
+ raise NotFound(error_message)
+
+ task_instance = dag_run.get_task_instance(task_id=task.task_id,
map_index=map_index)
+ task_instance.refresh_from_task(task)
+ if not task_instance:
+ error_message = f"Task instance not found for task {task_id!r} on DAG
run with ID {dag_run_id!r}"
+ raise NotFound(detail=error_message)
+
+ dep_context = DepContext(
Review Comment:
A comment explaining why we need this will be nice
##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3581,6 +3619,31 @@ components:
description:
The value can be repeated to retrieve multiple matching values (OR
condition).
+ RunTaskInstanceForm:
+ type: object
+ required:
+ - ignore_all_deps
+ - ignore_task_deps
+ - ignore_ti_state
+ - map_index
+ properties:
+ ignore_all_deps:
+ type: boolean
+ description: |
+ Ignore task instance dependencies state during execution.
Review Comment:
```suggestion
Ignore all dependencies state during execution.
```
##########
tests/api_connexion/endpoints/test_task_instance_endpoint.py:
##########
@@ -1491,3 +1492,111 @@ def
test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
)
assert response.status_code == 400
assert response.json['detail'] == expected
+
+
+class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
+ def heartbeat(self):
+ return True
+
+
[email protected](
+ "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
+ return_value=_ForceHeartbeatCeleryExecutor(),
+)
+class TestTaskInstanceRunEndpoint(TestTaskInstanceEndpoint):
Review Comment:
We should also test for permissions
##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -512,3 +517,84 @@ def post_set_task_instances_state(*, dag_id: str, session:
Session = NEW_SESSION
session=session,
)
return
task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
[email protected]_access(
+ [
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ],
+)
+@provide_session
+def run_task_instance(
+ *, dag_id: str, dag_run_id: str, task_id: str, session: Session =
NEW_SESSION
+) -> APIResponse:
+ """Set a state of task instances."""
Review Comment:
Is this description correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]