[ 
https://issues.apache.org/jira/browse/AIRFLOW-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013669#comment-17013669
 ] 

ASF GitHub Bot commented on AIRFLOW-6527:
-----------------------------------------

yuqian90 commented on pull request #7143: [AIRFLOW-6527] Make 
send_task_to_executor timeout configurable
URL: https://github.com/apache/airflow/pull/7143
 
 
   This change makes the timeout for sending tasks to Celery worker 
configurable. If people encounter the same Celery timeout error described in 
AIRFLOW-6527, they may increase `[celery]send_task_timeout` to make the timeout 
less likely to happen.
   
   ---
   Issue link: WILL BE INSERTED BY 
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = 
JIRA ID<sup>*</sup>
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with 
`[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Error sending Celery task:Timeout in send_task_to_executor
> ----------------------------------------------------------
>
>                 Key: AIRFLOW-6527
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6527
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: Qian Yu
>            Priority: Major
>
> We use Airflow with CeleryExecutor and redis broker. Our airflow scheduler 
> often encounters this \{{AirflowTaskTimeout}} error. 
> - This happens in \{{send_task_to_executor()}}. 
> - It only happens occasionally. 
> - Retrying the failed task a few times always works.
> - This affects at least 1.10.6 and 1.10.7 and possibly other versions too. 
> Possible cause:
> Our airflow venv and dags_folder are on an NFS mount because we want to keep 
> the various pieces of Airflow services in sync. 
> The NFS mount can be slow sometimes. This causes the import to be slow and 
> causes \{{send_task_to_executor()}} to take more than 2 seconds.
> Other people with similar looking problems:
> The following issue is now closed. It's not clear to me whether or how the 
> user resolved this issue.
> https://github.com/bitnami/bitnami-docker-airflow-scheduler/issues/1
> Another user asked a question in the mailing list. It's not answered.
> https://www.mail-archive.com/dev@airflow.apache.org/msg01093.html
> Proposed workaround:
> - Make this `timeout(seconds=2)` configurable. E.g adding a 
> [celery]send_task_timeout to airflow.cfg. Since 2 seconds seems too short, we 
> can configure it to something like 15 seconds to make it much less likely to 
> happen.
> - Move airflow venv to the local disk. This makes it inconvenient to sync 
> airflow installation across multiple hosts though.
> {code}
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,763] 
> \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID: 
> 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: Celery Task ID: 
> ('example_daily', 'example_sensor1', datetime.datetime(2020, 1, 9, 0, 0, 
> tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call 
> last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py", 
> line 42, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: return 
> obj.__dict__[self.__name__]
> Jan 09 22:46:59 scheduler_host airflow[18882]: KeyError: 'amqp'
> Jan 09 22:46:59 scheduler_host airflow[18882]: During handling of the above 
> exception, another exception occurred:
> Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call 
> last):
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py",
>  line 118, in send_task_to_executor
> Jan 09 22:46:59 scheduler_host airflow[18882]: result = 
> task.apply_async(args=[command], queue=queue)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/task.py", line 
> 570, in apply_async
> Jan 09 22:46:59 scheduler_host airflow[18882]: **options
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line 
> 712, in send_task
> Jan 09 22:46:59 scheduler_host airflow[18882]: amqp = self.amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py", 
> line 44, in __get__
> Jan 09 22:46:59 scheduler_host airflow[18882]: value = 
> obj.__dict__[self.__name__] = self.__get(obj)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line 
> 1202, in amqp
> Jan 09 22:46:59 scheduler_host airflow[18882]: return 
> instantiate(self.amqp_cls, app=self)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/utils/imports.py", 
> line 55, in instantiate
> Jan 09 22:46:59 scheduler_host airflow[18882]: return 
> symbol_by_name(name)(*args, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/imports.py", 
> line 57, in symbol_by_name
> Jan 09 22:46:59 scheduler_host airflow[18882]: module = imp(module_name, 
> package=package, **kwargs)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: return 
> _bootstrap._gcd_import(name[level:], package, level)
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 994, in _gcd_import
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 955, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 665, in _load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 678, in exec_module
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 219, in _call_with_frames_removed
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/amqp.py", line 
> 23, in <module>
> Jan 09 22:46:59 scheduler_host airflow[18882]: from . import routes as _routes
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 971, in _find_and_load
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 951, in _find_and_load_unlocked
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap>", line 894, in _find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 1157, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 1129, in _get_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 1271, in find_spec
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 96, in _path_isfile
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 88, in _path_is_mode_type
> Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen 
> importlib._bootstrap_external>", line 82, in _path_stat
> Jan 09 22:46:59 scheduler_host airflow[18882]: File 
> "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
>  line 43, in handle_timeout
> Jan 09 22:46:59 scheduler_host airflow[18882]: raise 
> AirflowTaskTimeout(self.error_message)
> Jan 09 22:46:59 scheduler_host airflow[18882]: 
> airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 27724
> Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,764] 
> \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID: 
> 27725
> {code}
> This is the code that causes this. The timeout(seconds=2) is hardcoded:
> {code:python}
> def send_task_to_executor(task_tuple):
>     key, simple_ti, command, queue, task = task_tuple
>     try:
>         with timeout(seconds=2):
>             result = task.apply_async(args=[command], queue=queue)
>     except Exception as e:
>         exception_traceback = "Celery Task ID: {}\n{}".format(key,
>                                                               
> traceback.format_exc())
>         result = ExceptionWithTraceback(e, exception_traceback)
>     return key, command, result
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to