SameerMesiah97 commented on code in PR #63888:
URL: https://github.com/apache/airflow/pull/63888#discussion_r3030800973
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -369,27 +369,23 @@ def send_workload_to_executor(
try:
import redis.client # noqa: F401
except ImportError:
- pass # Redis not installed or not using Redis backend
+ pass # Redis not installed or not using Redis backend.
try:
with timeout(seconds=OPERATION_TIMEOUT):
- result = task_to_run.apply_async(args=args, queue=queue)
+ result = celery_task.apply_async(args=args, queue=queue)
except (Exception, AirflowTaskTimeout) as e:
exception_traceback = f"Celery Task ID:
{key}\n{traceback.format_exc()}"
result = ExceptionWithTraceback(e, exception_traceback)
# The type is right for the version, but the type cannot be defined
correctly for Airflow 2 and 3
- # concurrently;
+ # concurrently.
return key, args, result
-# Backward compatibility alias
-send_task_to_executor = send_workload_to_executor
Review Comment:
We no longer need the `send_task_to_executor` alias as the references to it
in the functions that were using in `celery_executor.py` and
`test_celery_executor.py` are now referencing `send_workload_to_executor`
instead after the changes introduced in this PR. Accordingly, the
`provider.info` and `provider.yaml` files have now been updated.
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -102,7 +107,7 @@ class CeleryExecutor(BaseExecutor):
if TYPE_CHECKING:
if AIRFLOW_V_3_0_PLUS:
# TODO: TaskSDK: move this type change into BaseExecutor
- queued_tasks: dict[TaskInstanceKey, workloads.All] # type:
ignore[assignment]
+ queued_tasks: dict[WorkloadKey, workloads.All] # type:
ignore[assignment]
Review Comment:
I got an error I could not resolve without potentially overcomplicating the
typing when I attempted to implement your suggestion. Instead, I have now
followed what I did in PR #63035 here. This is the relevant block of code for
your reference:
if AIRFLOW_V_3_2_PLUS:
from airflow.executors.workloads.types import WorkloadKey as
_WorkloadKey
WorkloadKey: TypeAlias = _WorkloadKey
else:
WorkloadKey: TypeAlias = TaskInstanceKey # type: ignore[no-redef,
misc]
I believe it should satisfy your concern. You will notice the `WorkloadKey`
type alias is still being used but the atual underlying type is
`TaskInstanceKey`, which is what should be used for `queued_tasks` on line 110
when Airflow version is at least 3.0 but less recent than 3.2.
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -136,149 +141,151 @@ def __init__(self, *args, **kwargs):
from airflow.providers.celery.executors.celery_executor_utils import
BulkStateFetcher
self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism,
celery_app=self.celery_app)
- self.tasks = {}
- self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
- self.task_publish_max_retries = self.conf.getint("celery",
"task_publish_max_retries", fallback=3)
+ self.workloads: dict[WorkloadKey, AsyncResult] = {}
+ self.workload_publish_retries: Counter[WorkloadKey] = Counter()
+ self.workload_publish_max_retries = self.conf.getint("celery",
"task_publish_max_retries", fallback=3)
def start(self) -> None:
self.log.debug("Starting Celery Executor using %s processes for
syncing", self._sync_parallelism)
- def _num_tasks_per_send_process(self, to_send_count: int) -> int:
+ def _num_workloads_per_send_process(self, to_send_count: int) -> int:
"""
- How many Celery tasks should each worker process send.
+ How many Celery workloads should each worker process send.
- :return: Number of tasks that should be sent per process
+ :return: Number of workloads that should be sent per process
"""
return max(1, math.ceil(to_send_count / self._sync_parallelism))
def _process_tasks(self, task_tuples: Sequence[TaskTuple]) -> None:
- # Airflow V2 version
+ # Airflow V2 compatibility path — converts task tuples into
workload-compatible tuples.
task_tuples_to_send = [task_tuple[:3] + (self.team_name,) for
task_tuple in task_tuples]
- self._send_tasks(task_tuples_to_send)
+ self._send_workloads(task_tuples_to_send)
def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
- # Airflow V3 version -- have to delay imports until we know we are on
v3
+ # Airflow V3 version -- have to delay imports until we know we are on
v3.
from airflow.executors.workloads import ExecuteTask
if AIRFLOW_V_3_2_PLUS:
from airflow.executors.workloads import ExecuteCallback
- tasks: list[WorkloadInCelery] = []
+ workloads_to_be_sent: list[WorkloadInCelery] = []
for workload in workloads:
if isinstance(workload, ExecuteTask):
- tasks.append((workload.ti.key, workload, workload.ti.queue,
self.team_name))
+ workloads_to_be_sent.append((workload.ti.key, workload,
workload.ti.queue, self.team_name))
elif AIRFLOW_V_3_2_PLUS and isinstance(workload, ExecuteCallback):
- # Use default queue for callbacks, or extract from callback
data if available
+ # Use default queue for callbacks, or extract from callback
data if available.
queue = "default"
if isinstance(workload.callback.data, dict) and "queue" in
workload.callback.data:
queue = workload.callback.data["queue"]
- tasks.append((workload.callback.key, workload, queue,
self.team_name))
+ workloads_to_be_sent.append((workload.callback.key, workload,
queue, self.team_name))
else:
raise ValueError(f"{type(self)}._process_workloads cannot
handle {type(workload)}")
- self._send_tasks(tasks)
+ self._send_workloads(workloads_to_be_sent)
- def _send_tasks(self, task_tuples_to_send: Sequence[WorkloadInCelery]):
+ def _send_workloads(self, workload_tuples_to_send:
Sequence[WorkloadInCelery]):
# Celery state queries will be stuck if we do not use one same backend
- # for all tasks.
+ # for all workloads.
cached_celery_backend = self.celery_app.backend
- key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
- self.log.debug("Sent all tasks.")
+ key_and_async_results =
self._send_workloads_to_celery(workload_tuples_to_send)
+ self.log.debug("Sent all workloads.")
from airflow.providers.celery.executors.celery_executor_utils import
ExceptionWithTraceback
for key, _, result in key_and_async_results:
if isinstance(result, ExceptionWithTraceback) and isinstance(
result.exception, AirflowTaskTimeout
):
- retries = self.task_publish_retries[key]
- if retries < self.task_publish_max_retries:
+ retries = self.workload_publish_retries[key]
+ if retries < self.workload_publish_max_retries:
Stats.incr("celery.task_timeout_error")
self.log.info(
- "[Try %s of %s] Task Timeout Error for Task: (%s).",
- self.task_publish_retries[key] + 1,
- self.task_publish_max_retries,
+ "[Try %s of %s] Task Timeout Error for Workload:
(%s).",
+ self.workload_publish_retries[key] + 1,
+ self.workload_publish_max_retries,
tuple(key),
)
- self.task_publish_retries[key] = retries + 1
+ self.workload_publish_retries[key] = retries + 1
continue
if key in self.queued_tasks:
self.queued_tasks.pop(key)
else:
self.queued_callbacks.pop(key, None)
- self.task_publish_retries.pop(key, None)
+ self.workload_publish_retries.pop(key, None)
if isinstance(result, ExceptionWithTraceback):
self.log.error("%s: %s\n%s\n", CELERY_SEND_ERR_MSG_HEADER,
result.exception, result.traceback)
self.event_buffer[key] = (TaskInstanceState.FAILED, None)
elif result is not None:
result.backend = cached_celery_backend
self.running.add(key)
- self.tasks[key] = result
+ self.workloads[key] = result
- # Store the Celery task_id in the event buffer. This will get
"overwritten" if the task
+ # Store the Celery task_id (workload execution ID) in the
event buffer. This will get "overwritten" if the task
# has another event, but that is fine, because the only other
events are success/failed at
- # which point we don't need the ID anymore anyway
+ # which point we don't need the ID anymore anyway.
self.event_buffer[key] = (TaskInstanceState.QUEUED,
result.task_id)
- def _send_tasks_to_celery(self, task_tuples_to_send:
Sequence[WorkloadInCelery]):
- from airflow.providers.celery.executors.celery_executor_utils import
send_task_to_executor
+ def _send_workloads_to_celery(self, workload_tuples_to_send:
Sequence[WorkloadInCelery]):
+ from airflow.providers.celery.executors.celery_executor_utils import
send_workload_to_executor
- if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
+ if len(workload_tuples_to_send) == 1 or self._sync_parallelism == 1:
# One tuple, or max one process -> send it in the main thread.
- return list(map(send_task_to_executor, task_tuples_to_send))
+ return list(map(send_workload_to_executor,
workload_tuples_to_send))
# Use chunks instead of a work queue to reduce context switching
- # since tasks are roughly uniform in size
- chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
- num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
+ # since workloads are roughly uniform in size.
+ chunksize =
self._num_workloads_per_send_process(len(workload_tuples_to_send))
+ num_processes = min(len(workload_tuples_to_send),
self._sync_parallelism)
- # Use ProcessPoolExecutor with team_name instead of task objects to
avoid pickling issues.
+ # Use ProcessPoolExecutor with team_name instead of workload objects
to avoid pickling issues.
# Subprocesses reconstruct the team-specific Celery app from the team
name and existing config.
with ProcessPoolExecutor(max_workers=num_processes) as send_pool:
key_and_async_results = list(
- send_pool.map(send_task_to_executor, task_tuples_to_send,
chunksize=chunksize)
+ send_pool.map(send_workload_to_executor,
workload_tuples_to_send, chunksize=chunksize)
)
return key_and_async_results
def sync(self) -> None:
- if not self.tasks:
- self.log.debug("No task to query celery, skipping sync")
+ if not self.workloads:
+ self.log.debug("No workload to query celery, skipping sync")
return
- self.update_all_task_states()
+ self.update_all_workload_states()
def debug_dump(self) -> None:
"""Debug dump; called in response to SIGUSR2 by the scheduler."""
super().debug_dump()
self.log.info(
- "executor.tasks (%d)\n\t%s", len(self.tasks),
"\n\t".join(map(repr, self.tasks.items()))
+ "executor.workloads (%d)\n\t%s",
+ len(self.workloads),
+ "\n\t".join(map(repr, self.workloads.items())),
)
- def update_all_task_states(self) -> None:
- """Update states of the tasks."""
- self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
- state_and_info_by_celery_task_id =
self.bulk_state_fetcher.get_many(self.tasks.values())
+ def update_all_workload_states(self) -> None:
+ """Update states of the workloads."""
+ self.log.debug("Inquiring about %s celery workload(s)",
len(self.workloads))
+ state_and_info_by_celery_task_id =
self.bulk_state_fetcher.get_many(self.workloads.values())
self.log.debug("Inquiries completed.")
- for key, async_result in list(self.tasks.items()):
+ for key, async_result in list(self.workloads.items()):
state, info =
state_and_info_by_celery_task_id.get(async_result.task_id)
if state:
- self.update_task_state(key, state, info)
+ self.update_workload_state(key, state, info)
def change_state(
self, key: TaskInstanceKey, state: TaskInstanceState, info=None,
remove_running=True
) -> None:
super().change_state(key, state, info, remove_running=remove_running)
- self.tasks.pop(key, None)
+ self.workloads.pop(key, None)
- def update_task_state(self, key: TaskInstanceKey, state: str, info: Any)
-> None:
- """Update state of a single task."""
+ def update_workload_state(self, key: WorkloadKey, state: str, info: Any)
-> None:
Review Comment:
The terminology changes to this function have been reverted. However, the
terminology changes for `update_all_workload_states` have been kept as the
function is operating on the `workloads` member. This should not be misleading
as only `update_task_state` is being called within it, which implies that
workload state updates are not yet supported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]