jscheffl commented on code in PR #60855:
URL: https://github.com/apache/airflow/pull/60855#discussion_r2969504190
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -189,12 +189,18 @@ def on_celery_worker_ready(*args, **kwargs):
# and deserialization for us
@app.task(name="execute_workload")
def execute_workload(input: str) -> None:
+ from celery.exceptions import Ignore
from pydantic import TypeAdapter
from airflow.configuration import conf
from airflow.executors import workloads
from airflow.sdk.execution_time.supervisor import supervise
+ try:
+ from airflow.sdk.exceptions import TaskAlreadyRunningError
+ except ImportError:
+ TaskAlreadyRunningError = None # type: ignore[misc,assignment]
+
Review Comment:
If we know which version this fix is being released (can this go into
3.2.0?) then we should rather use the `AIRFLOW_V_3_2_PLUS` pattern such that in
future it is easier to clean-up and it marks it clear which core version is
needed for a feature.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2107,6 +2107,10 @@ def supervise(
final_state=process.final_state,
)
return exit_code
+ except TaskAlreadyRunningError:
+ # Let the executor handle this
+ log.info("Task is already running elsewhere, exiting",
task_instance_id=str(ti.id))
+ raise
Review Comment:
THe only purpose of catching and re-raising is posting an information? Do we
need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]