Sha-of-Lazy opened a new issue, #23207:
URL: https://github.com/apache/airflow/issues/23207

   ### Apache Airflow version
   
   2.2.5 (latest released)
   
   ### What happened
   
   The DAG run randomly(about 5%) failed when scheduled. In the web UI it was 
like 'failed' and 'Not yet started'.
   While checking the logs, we find the error below in the scheduler's std 
output:
   
   > [2022-04-25 07:00:00,457] {scheduler_job.py:548} INFO - Sending 
TaskInstanceKey(dag_id='my_dag_id', task_id='my_dag_id_1', 
run_id='scheduled__2022-04-24T22:55:00+00:00', try_number=1) to executor with 
priority 1 and queue default
   [2022-04-25 07:00:00,457] {base_executor.py:85} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'my_dag_id', 'my_dag_id_1', 
'scheduled__2022-04-24T22:55:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/auto_generated_my_dag_id.py']
   [2022-04-25 07:00:00,618] {celery_executor.py:299} ERROR - Error sending 
Celery task: This result object does not return rows. It has been closed 
automatically.
   Celery Task ID: TaskInstanceKey(dag_id='my_dag_id', task_id='my_dag_id_1', 
run_id='scheduled__2022-04-24T22:55:00+00:00', try_number=1)
   Traceback (most recent call last):
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", 
line 779, in _getter
       getter = self._metadata._getter
   AttributeError: 'NoneType' object has no attribute '_getter'
   > The above exception was the direct cause of the following exception:
   > Traceback (most recent call last):
     File 
"/usr/local/python3/lib/python3.6/site-packages/airflow/executors/celery_executor.py",
 line 172, in send_task_to_executor
       result = task_to_run.apply_async(args=[command], queue=queue)
     File "/usr/local/python3/lib/python3.6/site-packages/celery/app/task.py", 
line 576, in apply_async
       **options
     File "/usr/local/python3/lib/python3.6/site-packages/celery/app/base.py", 
line 767, in send_task
       amqp.send_task_message(P, name, message, **options)
     File "/usr/local/python3/lib/python3.6/site-packages/celery/app/amqp.py", 
line 519, in send_task_message
       **properties
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", 
line 180, in publish
       exchange_name, declare, timeout
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/connection.py", 
line 525, in _ensured
       return fun(*args, **kwargs)
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", 
line 193, in _publish
       [maybe_declare(entity) for entity in declare]
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", 
line 193, in <listcomp>
       [maybe_declare(entity) for entity in declare]
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", 
line 99, in maybe_declare
       return maybe_declare(entity, self.channel, retry, **retry_policy)
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/common.py", 
line 119, in maybe_declare
       return _maybe_declare(entity, channel)
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/common.py", 
line 159, in _maybe_declare
       entity.declare(channel=channel)
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/entity.py", 
line 606, in declare
       self._create_queue(nowait=nowait, channel=channel)
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/entity.py", 
line 615, in _create_queue
       self.queue_declare(nowait=nowait, passive=False, channel=channel)
     File "/usr/local/python3/lib/python3.6/site-packages/kombu/entity.py", 
line 650, in queue_declare
       nowait=nowait,
     File 
"/usr/local/python3/lib/python3.6/site-packages/kombu/transport/virtual/base.py",
 line 529, in queue_declare
       return queue_declare_ok_t(queue, self._size(queue), 0)
     File 
"/usr/local/python3/lib/python3.6/site-packages/kombu/transport/sqlalchemy/__init__.py",
 line 196, in _size
       return self._query_all(queue).count()
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3803, in count
       return self.from_self(col).scalar()
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3523, in scalar
       ret = self.one()
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3490, in one
       ret = self.one_or_none()
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3459, in one_or_none
       ret = list(self)
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", 
line 100, in instances
       cursor.close()
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       with_traceback=exc_tb,
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", 
line 60, in instances
       for query_entity in query._entities
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", 
line 60, in <listcomp>
       for query_entity in query._entities
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 
4849, in row_processor
       getter = result._getter(column)
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", 
line 781, in _getter
       return self._non_result(None, err)
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", 
line 1241, in _non_result
       replace_context=err,
     File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
   sqlalchemy.exc.ResourceClosedError: This result object does not return rows. 
It has been closed automatically.
   
   > [2022-04-25 07:00:00,628] {scheduler_job.py:608} INFO - Executor reports 
execution of my_dag_id.my_dag_id_1 run_id=scheduled__2022-04-24T22:55:00+00:00 
exited with status failed for try_number 1
   
   and in some other cases it looks like:
   >   Traceback (most recent call last):
       File 
"/usr/local/python3/lib/python3.6/site-packages/airflow/executors/celery_executor.py",
 line 172, in send_task_to_executor
         result = task_to_run.apply_async(args=[command], queue=queue)
    ******************* similar trace stacks*********************** 
      File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", 
line 704, in _getter
         ret = self._key_fallback(key, None, raiseerr)
       File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", 
line 686, in _key_fallback
         replace_context=err,
       File 
"/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
         raise exception
     sqlalchemy.exc.NoSuchColumnError: "Could not locate column in row for 
column 'count(*)'"
   
   or:
   
   >  "Could not locate column in row for column 'kombu_queue.id'"
   
   Basicly it seems like a bug when celery executor associating with sqlachemy. 
   
   ### What you think should happen instead
   
   the task should be scheduled successfully
   
   ### How to reproduce
   
   The error was happening continuously in my environment. I'm trying to supply 
some relevant configs(in my mind) to help reproduce:
   12 nodes(docker), both running worker and scheduler at same time, connecting 
to one mysql8 database
   
   executor = CeleryExecutor
   [celery]
   worker_concurrency = 4
   broker_url, result_backend: the same sqlachemy connection to a mysql8 
database
   pool = prefork
   worker_precheck = False
   
   [scheduler]
   job_heartbeat_sec = 5
   scheduler_heartbeat_sec = 5
   num_runs = -1
   parsing_processes = 2
   
   ### Operating System
   
   Fedora EL7
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.2.5
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-ftp==2.1.0
   apache-airflow-providers-http==2.1.0
   apache-airflow-providers-imap==2.2.1
   apache-airflow-providers-mysql==2.2.1
   apache-airflow-providers-sqlite==2.1.1
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to