anishgirianish commented on code in PR #60855:
URL: https://github.com/apache/airflow/pull/60855#discussion_r2791199602
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -159,15 +165,21 @@ def execute_workload(input: str) -> None:
base_url = f"http://localhost:8080{base_url}"
default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
- supervise(
- # This is the "wrong" ti type, but it duck types the same. TODO:
Create a protocol for this.
- ti=workload.ti, # type: ignore[arg-type]
- dag_rel_path=workload.dag_rel_path,
- bundle_info=workload.bundle_info,
- token=workload.token,
- server=conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
- log_path=workload.log_path,
- )
+ try:
+ supervise(
+ # This is the "wrong" ti type, but it duck types the same. TODO:
Create a protocol for this.
+ ti=workload.ti, # type: ignore[arg-type]
+ dag_rel_path=workload.dag_rel_path,
+ bundle_info=workload.bundle_info,
+ token=workload.token,
+ server=conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
+ log_path=workload.log_path,
+ )
+ except Exception as e:
+ if TaskAlreadyRunningError is not None and isinstance(e,
TaskAlreadyRunningError):
+ log.info("[%s] Task already running elsewhere, ignoring
redelivered message", celery_task_id)
Review Comment:
Thanks for the question. This is new for Airflow 3. In 2.x, the scheduler
only processed executor failure events for QUEUED tasks, so redelivery didn't
affect running tasks. In 3.x, it also handles RUNNING so a redelivered message
hitting a 409 gets recorded as FAILURE by Celery, and the scheduler marks the
still-running task as failed. Ignore() prevents that.
Added a comment explaining this. Verified locally with a forced Redis
redelivery — without the fix, the task gets marked as failed; with it, it
completes successfully.
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -159,15 +165,21 @@ def execute_workload(input: str) -> None:
base_url = f"http://localhost:8080{base_url}"
default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
- supervise(
- # This is the "wrong" ti type, but it duck types the same. TODO:
Create a protocol for this.
- ti=workload.ti, # type: ignore[arg-type]
- dag_rel_path=workload.dag_rel_path,
- bundle_info=workload.bundle_info,
- token=workload.token,
- server=conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
- log_path=workload.log_path,
- )
+ try:
+ supervise(
+ # This is the "wrong" ti type, but it duck types the same. TODO:
Create a protocol for this.
+ ti=workload.ti, # type: ignore[arg-type]
+ dag_rel_path=workload.dag_rel_path,
+ bundle_info=workload.bundle_info,
+ token=workload.token,
+ server=conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
+ log_path=workload.log_path,
+ )
+ except Exception as e:
+ if TaskAlreadyRunningError is not None and isinstance(e,
TaskAlreadyRunningError):
+ log.info("[%s] Task already running elsewhere, ignoring
redelivered message", celery_task_id)
Review Comment:
Thanks for the question. This is new for Airflow 3. In 2.x, the scheduler
only processed executor failure events for QUEUED tasks, so redelivery didn't
affect running tasks. In 3.x, it also handles RUNNING so a redelivered message
hitting a 409 gets recorded as FAILURE by Celery, and the scheduler marks the
still-running task as failed. Ignore() prevents that.
Added a comment explaining this. Verified locally with a forced Redis
redelivery without the fix, the task gets marked as failed; with it, it
completes successfully.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]