r4um opened a new issue, #26520:
URL: https://github.com/apache/airflow/issues/26520

   ### Apache Airflow version
   
   Other Airflow 2 version
   
   ### What happened
   
   The change in https://github.com/apache/airflow/pull/3830 introduced 
multiprocessing while syncing task status, when using celery executor with AWS 
SQS broker, the dag runs fail with the following backtrace in the scheduler 
logs.
   
   ```
   [2022-02-01 12:52:52,595] {celery_executor.py:299} ERROR - Error sending 
Celery task: Unable to parse response (no element found: line 1, column 0), 
invalid XML received. Further retries may succeed:
   b''
   Celery Task ID: 
TaskInstanceKey(dag_id='shared-dev_jobflow-airflow_k8s_job_multiple_tasks', 
task_id='t4', run_id='manual__2022-02-01T12:52:40.329850+00:00', try_number=1)
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 
480, in _parse_xml_string_to_dom
       root = parser.close()
     File "<string>", line None
   xml.etree.ElementTree.ParseError: no element found: line 1, column 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", 
line 172, in send_task_to_executor
       result = task_to_run.apply_async(args=[command], queue=queue)
     File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 
579, in apply_async
       **options
     File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 
788, in send_task
       amqp.send_task_message(P, name, message, **options)
     File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 
519, in send_task_message
       **properties
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 
180, in publish
       exchange_name, declare, timeout
     File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 
524, in _ensured
       return fun(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 
193, in _publish
       [maybe_declare(entity) for entity in declare]
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 
193, in <listcomp>
       [maybe_declare(entity) for entity in declare]
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 99, 
in maybe_declare
       return maybe_declare(entity, self.channel, retry, **retry_policy)
     File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 110, 
in maybe_declare
       return _maybe_declare(entity, channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 150, 
in _maybe_declare
       entity.declare(channel=channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 606, 
in declare
       self._create_queue(nowait=nowait, channel=channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 615, 
in _create_queue
       self.queue_declare(nowait=nowait, passive=False, channel=channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 650, 
in queue_declare
       nowait=nowait,
     File 
"/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 
528, in queue_declare
       return queue_declare_ok_t(queue, self._size(queue), 0)
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 
607, in _size
       AttributeNames=['ApproximateNumberOfMessages'])
     File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 
388, in _api_call
       return self._make_api_call(operation_name, kwargs)
     File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 
695, in _make_api_call
       operation_model, request_dict, request_context)
     File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 
714, in _make_request
       return self._endpoint.make_request(operation_model, request_dict)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 
102, in make_request
       return self._send_request(request_dict, operation_model)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 
135, in _send_request
       request, operation_model, context)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 
166, in _get_response
       request, operation_model)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 
217, in _do_get_response
       response_dict, operation_model.output_shape)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 
245, in parse
       parsed = self._do_parse(response, shape)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 
551, in _do_parse
       return self._parse_body_as_xml(response, shape, inject_metadata=True)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 
555, in _parse_body_as_xml
       root = self._parse_xml_string_to_dom(xml_contents)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 
485, in _parse_xml_string_to_dom
       (e, xml_string))
   botocore.parsers.ResponseParserError: Unable to parse response (no element 
found: line 1, column 0), invalid XML received. Further retries may succeed: 
   b''
   
   [2022-02-01 12:52:52,821] {scheduler_job.py:572} ERROR - Executor reports 
task instance <TaskInstance: 
shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 
manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although 
the task says its queued. (Info: None) Was the task killed externally?
   [2022-02-01 12:52:52,825] {taskinstance.py:1705} ERROR - Executor reports 
task instance <TaskInstance: 
shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 
manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although 
the task says its queued. (Info: None) Was the task killed externally?
   [2022-02-01 12:52:52,834] {taskinstance.py:1280} INFO - Marking task as 
FAILED. dag_id=shared-dev_jobflow-airflow_k8s_job_multiple_tasks, task_id=t4, 
execution_date=20220201T125240, start_date=, end_date=20220201T125252
   ```    
   
   This is likely due to boto3 session and resource objects not being thread 
safe[1]. Setting `sync_parallelism` to `1`,  the problem does not occur.
   
   [1] 
https://boto3.amazonaws.com/v1/documentation/api/1.14.31/guide/session.html#multithreading-or-multiprocessing-with-sessions
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Use celery executor and AWS SQS as the celery broker. Launch a dag with 
multiple tasks and dependencies among them. e.g 5 tasks with dependencies as 
   
   ```
   t3 << [t1, t2]
   t4 << [t3]
   t5 << [t3]
   ```
   
   Task type doesn't matter. In our case, it is the KubernetesPodOperator.
   
   ### Operating System
   
   CentOS Linux 7 (Core)
   
   ### Versions of Apache Airflow Providers
   
   airflow version 2.2.2
   Python version 3.7.9
   Using airflow constraints for 2.2.2 while installing packages.
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Deployed  under AWS EKS via containers. 
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to