pierrejeambrun commented on code in PR #55660:
URL: https://github.com/apache/airflow/pull/55660#discussion_r2394480150
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -217,13 +218,22 @@ def clear_task_instances(
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
+ task_confirmed_running = False
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
+
+ if prevent_running_task:
+ # If the task contains the message, prevent the task from running.
+ task_confirmed_running = True
+
if ti.state == TaskInstanceState.RUNNING:
+ if task_confirmed_running:
Review Comment:
```suggestion
if prevent_running_task:
```
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -217,13 +218,22 @@ def clear_task_instances(
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
+ task_confirmed_running = False
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
+
+ if prevent_running_task:
+ # If the task contains the message, prevent the task from running.
+ task_confirmed_running = True
+
if ti.state == TaskInstanceState.RUNNING:
+ if task_confirmed_running:
+ raise ValueError(f"Error: Task {ti.task_id} is running,
stopping attempt to run.")
# If a task is cleared when running, set its state to RESTARTING
so that
# the task is terminated and becomes eligible for retry.
Review Comment:
move comment inside the `else`
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -770,12 +772,16 @@ def post_clear_task_instances(
)
if not dry_run:
- clear_task_instances(
- task_instances,
- session,
- DagRunState.QUEUED if reset_dag_runs else False,
- run_on_latest_version=body.run_on_latest_version,
- )
+ try:
+ clear_task_instances(
+ task_instances,
+ session,
+ DagRunState.QUEUED if reset_dag_runs else False,
+ run_on_latest_version=body.run_on_latest_version,
+ prevent_running_task=body.prevent_running_task,
+ )
+ except ValueError as e:
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
Review Comment:
That pattern is ok but we should either:
- Check the 'message' for the ValueError. We only want this to handle
`ValueError` caused by the `prevent_running_task` behavior. Other unexpected
ValueError, shouldn't fall into 400 user errors.
- Change the exception raised there, maybe to raise a custom Exception such
as `ClearRunningTaskException` or something like that, so you can catch it be
sure not to interfer with other exceptions handling.
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -217,13 +218,22 @@ def clear_task_instances(
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
+ task_confirmed_running = False
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
+
+ if prevent_running_task:
+ # If the task contains the message, prevent the task from running.
+ task_confirmed_running = True
+
Review Comment:
I don't understand this.
(It is inside the `tis` loop but doesn't actually depend on the ti at hand)
Also you probably don't need another variable. Just use
`prevent_running_task` directly
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -217,13 +218,22 @@ def clear_task_instances(
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
+ task_confirmed_running = False
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
+
+ if prevent_running_task:
+ # If the task contains the message, prevent the task from running.
+ task_confirmed_running = True
+
if ti.state == TaskInstanceState.RUNNING:
+ if task_confirmed_running:
+ raise ValueError(f"Error: Task {ti.task_id} is running,
stopping attempt to run.")
Review Comment:
```suggestion
raise ValueError(f"Error: Task {ti.task_id} is running,
stopping attempt to clear.")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]