This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 0801322 Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248) 0801322 is described below commit 080132254b06127a6e2e8a2e23ceed6a7859d498 Author: Kamil BreguĊa <mik-...@users.noreply.github.com> AuthorDate: Tue Jul 27 11:50:18 2021 +0200 Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248) * Dont use TaskInstance in CeleryExecutor.trigger_tasks * fixup! Merge branch 'main' into celery-multiprocessing-2 --- airflow/executors/celery_executor.py | 14 +++++++------- tests/executors/test_celery_executor.py | 6 ++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index b2c5016..e3fc398 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -46,7 +46,7 @@ from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowTaskTimeout from airflow.executors.base_executor import BaseExecutor, CommandType, EventBufferValueType -from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey +from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname @@ -154,15 +154,15 @@ class ExceptionWithTraceback: # Task instance that is sent over Celery queues -# TaskInstanceKey, SimpleTaskInstance, Command, queue_name, CallableTask -TaskInstanceInCelery = Tuple[TaskInstanceKey, SimpleTaskInstance, CommandType, Optional[str], Task] +# TaskInstanceKey, Command, queue_name, CallableTask +TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task] def send_task_to_executor( task_tuple: TaskInstanceInCelery, ) -> Tuple[TaskInstanceKey, CommandType, Union[AsyncResult, ExceptionWithTraceback]]: """Sends task to executor.""" - key, _, command, queue, task_to_run = task_tuple + key, command, queue, task_to_run = task_tuple try: with timeout(seconds=OPERATION_TIMEOUT): result = task_to_run.apply_async(args=[command], queue=queue) @@ -250,8 +250,8 @@ class CeleryExecutor(BaseExecutor): task_tuples_to_send: List[TaskInstanceInCelery] = [] for _ in range(min(open_slots, len(self.queued_tasks))): - key, (command, _, queue, simple_ti) = sorted_queue.pop(0) - task_tuple = (key, simple_ti, command, queue, execute_command) + key, (command, _, queue, _) = sorted_queue.pop(0) + task_tuple = (key, command, queue, execute_command) task_tuples_to_send.append(task_tuple) if key not in self.task_publish_retries: self.task_publish_retries[key] = 1 @@ -260,7 +260,7 @@ class CeleryExecutor(BaseExecutor): self._process_tasks(task_tuples_to_send) def _process_tasks(self, task_tuples_to_send: List[TaskInstanceInCelery]) -> None: - first_task = next(t[4] for t in task_tuples_to_send) + first_task = next(t[3] for t in task_tuples_to_send) # Celery state queries will stuck if we do not use one same backend # for all tasks. diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 88ea95c..cdda8ee 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -126,14 +126,12 @@ class TestCeleryExecutor(unittest.TestCase): task_tuples_to_send = [ ( ('success', 'fake_simple_ti', execute_date, 0), - None, success_command, celery_executor.celery_configuration['task_default_queue'], celery_executor.execute_command, ), ( ('fail', 'fake_simple_ti', execute_date, 0), - None, fail_command, celery_executor.celery_configuration['task_default_queue'], celery_executor.execute_command, @@ -141,8 +139,8 @@ class TestCeleryExecutor(unittest.TestCase): ] # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict - for (key, simple_ti, command, queue, task) in task_tuples_to_send: - executor.queued_tasks[key] = (command, 1, queue, simple_ti) + for (key, command, queue, task) in task_tuples_to_send: + executor.queued_tasks[key] = (command, 1, queue, None) executor.task_publish_retries[key] = 1 executor._process_tasks(task_tuples_to_send)