r4um opened a new issue, #26520: URL: https://github.com/apache/airflow/issues/26520
### Apache Airflow version Other Airflow 2 version ### What happened The change in https://github.com/apache/airflow/pull/3830 introduced multiprocessing while syncing task status, when using celery executor with AWS SQS broker, the dag runs fail with the following backtrace in the scheduler logs. ``` [2022-02-01 12:52:52,595] {celery_executor.py:299} ERROR - Error sending Celery task: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed: b'' Celery Task ID: TaskInstanceKey(dag_id='shared-dev_jobflow-airflow_k8s_job_multiple_tasks', task_id='t4', run_id='manual__2022-02-01T12:52:40.329850+00:00', try_number=1) Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 480, in _parse_xml_string_to_dom root = parser.close() File "<string>", line None xml.etree.ElementTree.ParseError: no element found: line 1, column 0 During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 172, in send_task_to_executor result = task_to_run.apply_async(args=[command], queue=queue) File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 579, in apply_async **options File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 788, in send_task amqp.send_task_message(P, name, message, **options) File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 519, in send_task_message **properties File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 180, in publish exchange_name, declare, timeout File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 524, in _ensured return fun(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 193, in _publish [maybe_declare(entity) for entity in declare] File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 193, in <listcomp> [maybe_declare(entity) for entity in declare] File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 99, in maybe_declare return maybe_declare(entity, self.channel, retry, **retry_policy) File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 110, in maybe_declare return _maybe_declare(entity, channel) File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 150, in _maybe_declare entity.declare(channel=channel) File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 606, in declare self._create_queue(nowait=nowait, channel=channel) File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 615, in _create_queue self.queue_declare(nowait=nowait, passive=False, channel=channel) File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 650, in queue_declare nowait=nowait, File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 528, in queue_declare return queue_declare_ok_t(queue, self._size(queue), 0) File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 607, in _size AttributeNames=['ApproximateNumberOfMessages']) File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call return self._make_api_call(operation_name, kwargs) File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 695, in _make_api_call operation_model, request_dict, request_context) File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 714, in _make_request return self._endpoint.make_request(operation_model, request_dict) File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 102, in make_request return self._send_request(request_dict, operation_model) File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 135, in _send_request request, operation_model, context) File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 166, in _get_response request, operation_model) File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 217, in _do_get_response response_dict, operation_model.output_shape) File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 245, in parse parsed = self._do_parse(response, shape) File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 551, in _do_parse return self._parse_body_as_xml(response, shape, inject_metadata=True) File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 555, in _parse_body_as_xml root = self._parse_xml_string_to_dom(xml_contents) File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 485, in _parse_xml_string_to_dom (e, xml_string)) botocore.parsers.ResponseParserError: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed: b'' [2022-02-01 12:52:52,821] {scheduler_job.py:572} ERROR - Executor reports task instance <TaskInstance: shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally? [2022-02-01 12:52:52,825] {taskinstance.py:1705} ERROR - Executor reports task instance <TaskInstance: shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally? [2022-02-01 12:52:52,834] {taskinstance.py:1280} INFO - Marking task as FAILED. dag_id=shared-dev_jobflow-airflow_k8s_job_multiple_tasks, task_id=t4, execution_date=20220201T125240, start_date=, end_date=20220201T125252 ``` This is likely due to boto3 session and resource objects not being thread safe[1]. Setting `sync_parallelism` to `1`, the problem does not occur. [1] https://boto3.amazonaws.com/v1/documentation/api/1.14.31/guide/session.html#multithreading-or-multiprocessing-with-sessions ### What you think should happen instead _No response_ ### How to reproduce Use celery executor and AWS SQS as the celery broker. Launch a dag with multiple tasks and dependencies among them. e.g 5 tasks with dependencies as ``` t3 << [t1, t2] t4 << [t3] t5 << [t3] ``` Task type doesn't matter. In our case, it is the KubernetesPodOperator. ### Operating System CentOS Linux 7 (Core) ### Versions of Apache Airflow Providers airflow version 2.2.2 Python version 3.7.9 Using airflow constraints for 2.2.2 while installing packages. ### Deployment Other ### Deployment details Deployed under AWS EKS via containers. ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
