ephraimbuddy commented on code in PR #23530:
URL: https://github.com/apache/airflow/pull/23530#discussion_r874967966


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3581,6 +3619,31 @@ components:
           description:
             The value can be repeated to retrieve multiple matching values (OR 
condition).
 
+    RunTaskInstanceForm:
+      type: object
+      required:
+        - ignore_all_deps
+        - ignore_task_deps
+        - ignore_ti_state
+        - map_index

Review Comment:
   Should we have this on the fields themselves? Did you check how it appears 
on documentation? Is map_index required or it should have a default



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3581,6 +3619,31 @@ components:
           description:
             The value can be repeated to retrieve multiple matching values (OR 
condition).
 
+    RunTaskInstanceForm:
+      type: object
+      required:
+        - ignore_all_deps
+        - ignore_task_deps
+        - ignore_ti_state
+        - map_index
+      properties:
+        ignore_all_deps:
+          type: boolean
+          description: |
+            Ignore task instance dependencies state during execution.
+        ignore_task_deps:
+          type: boolean
+          description: |
+            Ignore task instance dependencies state during execution.

Review Comment:
   ```suggestion
               Ignore task dependencies state during execution.
   ```



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -512,3 +517,84 @@ def post_set_task_instances_state(*, dag_id: str, session: 
Session = NEW_SESSION
         session=session,
     )
     return 
task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
[email protected]_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def run_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Set a state of task instances."""
+    body = request.get_json()
+    try:
+        data = run_task_instance_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    ignore_all_deps = data.get('ignore_all_deps')
+    ignore_task_deps = data.get('ignore_task_deps')
+    ignore_ti_state = data.get('ignore_ti_state')
+    map_index = data.get('map_index')
+
+    executor = ExecutorLoader.get_default_executor()
+    if not getattr(executor, "supports_ad_hoc_ti_run", False):
+        error_message = "Only works with the Celery, CeleryKubernetes or 
Kubernetes executors"
+        raise BadRequest(detail=error_message)
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = current_app.dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    error_message = f"Task ID {task_id} not found"
+    task = dag.task_dict.get(task_id)
+    if not task:
+        raise NotFound(error_message)
+
+    dag_run = dag.get_dagrun(run_id=dag_run_id)
+    error_message = f"DagRun ID {dag_run_id} not found"
+    if not dag_run:
+        raise NotFound(error_message)
+
+    task_instance = dag_run.get_task_instance(task_id=task.task_id, 
map_index=map_index)
+    task_instance.refresh_from_task(task)
+    if not task_instance:
+        error_message = f"Task instance not found for task {task_id!r} on DAG 
run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)
+
+    dep_context = DepContext(

Review Comment:
   A comment explaining why we need this will be nice



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3581,6 +3619,31 @@ components:
           description:
             The value can be repeated to retrieve multiple matching values (OR 
condition).
 
+    RunTaskInstanceForm:
+      type: object
+      required:
+        - ignore_all_deps
+        - ignore_task_deps
+        - ignore_ti_state
+        - map_index
+      properties:
+        ignore_all_deps:
+          type: boolean
+          description: |
+            Ignore task instance dependencies state during execution.

Review Comment:
   ```suggestion
               Ignore all dependencies state during execution.
   ```



##########
tests/api_connexion/endpoints/test_task_instance_endpoint.py:
##########
@@ -1491,3 +1492,111 @@ def 
test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
         )
         assert response.status_code == 400
         assert response.json['detail'] == expected
+
+
+class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
+    def heartbeat(self):
+        return True
+
+
[email protected](
+    "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
+    return_value=_ForceHeartbeatCeleryExecutor(),
+)
+class TestTaskInstanceRunEndpoint(TestTaskInstanceEndpoint):

Review Comment:
   We should also test for permissions



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -512,3 +517,84 @@ def post_set_task_instances_state(*, dag_id: str, session: 
Session = NEW_SESSION
         session=session,
     )
     return 
task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
[email protected]_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def run_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Set a state of task instances."""

Review Comment:
   Is this description correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to