[
https://issues.apache.org/jira/browse/AIRFLOW-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kaxil Naik updated AIRFLOW-6527:
--------------------------------
Issue Type: Improvement (was: Bug)
> Error sending Celery task:Timeout in send_task_to_executor
> ----------------------------------------------------------
>
> Key: AIRFLOW-6527
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6527
> Project: Apache Airflow
> Issue Type: Improvement
> Components: scheduler
> Affects Versions: 1.10.7
> Reporter: Qian Yu
> Priority: Major
> Fix For: 2.0.0, 1.10.8
>
>
> We use Airflow with CeleryExecutor and redis broker. Our airflow scheduler
> often encounters this \{{AirflowTaskTimeout}} error.
> - This happens in \{{send_task_to_executor()}}.
> - It only happens occasionally.
> - Retrying the failed task a few times always works.
> - This affects at least 1.10.6 and 1.10.7 and possibly other versions too.
> Possible cause:
> Our airflow venv and dags_folder are on an NFS mount because we want to keep
> the various pieces of Airflow services in sync.
> The NFS mount can be slow sometimes. This causes the import to be slow and
> causes \{{send_task_to_executor()}} to take more than 2 seconds.
> Other people with similar looking problems:
> The following issue is now closed. It's not clear to me whether or how the
> user resolved this issue.
> https://github.com/bitnami/bitnami-docker-airflow-scheduler/issues/1
> Another user asked a question in the mailing list. It's not answered.
> https://www.mail-archive.com/[email protected]/msg01093.html
> Proposed workaround:
> - Make this `timeout(seconds=2)` configurable. E.g adding a
> [celery]send_task_timeout to airflow.cfg. Since 2 seconds seems too short, we
> can configure it to something like 15 seconds to make it much less likely to
> happen.
> - Move airflow venv to the local disk. This makes it inconvenient to sync
> airflow installation across multiple hosts though.
> {code}
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,763]
> \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID:
> 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: Celery Task ID:
> ('example_daily', 'example_sensor1', datetime.datetime(2020, 1, 9, 0, 0,
> tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call
> last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py",
> line 42, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: return
> obj.__dict__[self.__name__]
> Jan 09 22:46:59 scheduler_host airflow[18882]: KeyError: 'amqp'
> Jan 09 22:46:59 scheduler_host airflow[18882]: During handling of the above
> exception, another exception occurred:
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call
> last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py",
> line 118, in send_task_to_executor
> Jan 09 22:46:59 scheduler_host airflow[18882]: result =
> task.apply_async(args=[command], queue=queue)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/task.py", line
> 570, in apply_async
> Jan 09 22:46:59 scheduler_host airflow[18882]: **options
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line
> 712, in send_task
> Jan 09 22:46:59 scheduler_host airflow[18882]: amqp = self.amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py",
> line 44, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: value =
> obj.__dict__[self.__name__] = self.__get(obj)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line
> 1202, in amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: return
> instantiate(self.amqp_cls, app=self)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/utils/imports.py",
> line 55, in instantiate
> Jan 09 22:46:59 scheduler_host airflow[18882]: return
> symbol_by_name(name)(*args, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/imports.py",
> line 57, in symbol_by_name
> Jan 09 22:46:59 scheduler_host airflow[18882]: module = imp(module_name,
> package=package, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: return
> _bootstrap._gcd_import(name[level:], package, level)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 994, in _gcd_import
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 955, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 665, in _load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 678, in exec_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 219, in _call_with_frames_removed
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/amqp.py", line
> 23, in <module>
> Jan 09 22:46:59 scheduler_host airflow[18882]: from . import routes as _routes
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 951, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap>", line 894, in _find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 1157, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 1129, in _get_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 1271, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 96, in _path_isfile
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 88, in _path_is_mode_type
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen
> importlib._bootstrap_external>", line 82, in _path_stat
> Jan 09 22:46:59 scheduler_host airflow[18882]: File
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
> line 43, in handle_timeout
> Jan 09 22:46:59 scheduler_host airflow[18882]: raise
> AirflowTaskTimeout(self.error_message)
> Jan 09 22:46:59 scheduler_host airflow[18882]:
> airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,764]
> \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID:
> 27725
> {code}
> This is the code that causes this. The timeout(seconds=2) is hardcoded:
> {code:python}
> def send_task_to_executor(task_tuple):
> key, simple_ti, command, queue, task = task_tuple
> try:
> with timeout(seconds=2):
> result = task.apply_async(args=[command], queue=queue)
> except Exception as e:
> exception_traceback = "Celery Task ID: {}\n{}".format(key,
>
> traceback.format_exc())
> result = ExceptionWithTraceback(e, exception_traceback)
> return key, command, result
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)