mrexhepi opened a new issue, #43449:
URL: https://github.com/apache/airflow/issues/43449

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.9.3
   
   ### What happened?
   
   Whenever I start a DAG, Iam getting a timeout in scheduler and exit code 
with some errors as below:
   
   ```
   2024-10-28T20:03:57.467+0000] 
{scheduler_job_runner.py:417} INFO - 1 tasks up for execution:
   --
   <TaskInstance: edgetag_currency.task_pull 
manual__2024-10-28T19:57:46.193365+00:00 [scheduled]>
   [2024-10-28T20:03:57.468+0000] 
{scheduler_job_runner.py:480} INFO - DAG edgetag_currency has 0/16 
running and queued tasks
   [2024-10-28T20:03:57.469+0000] 
{scheduler_job_runner.py:596} INFO - Setting the following tasks 
to queued state:
   <TaskInstance: edgetag_currency.task_pull 
manual__2024-10-28T19:57:46.193365+00:00 [scheduled]>
   [2024-10-28T20:03:57.479+0000] 
{scheduler_job_runner.py:639} INFO - Sending 
TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull', 
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2, map_index=-1) 
to executor with priority 1 and queue default
   [2024-10-28T20:03:57.480+0000] {base_executor.py:149} 
INFO - Adding to queue: ['airflow', 'tasks', 'run', 'edgetag_currency', 
'task_pull', 'manual__2024-10-28T19:57:46.193365+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dags/currency_pull.py']
   [2024-10-28T20:03:58.482+0000] {timeout.py:68} ERROR - 
Process timed out, PID: 7
   [2024-10-28T20:03:58.491+0000] {celery_executor.py:279} 
INFO - [Try 1 of 3] Task Timeout Error for Task: 
(TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull', 
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2, 
map_index=-1)).
   [2024-10-28T20:03:59.567+0000] {timeout.py:68} ERROR - 
Process timed out, PID: 7
   [2024-10-28T20:03:59.571+0000] {celery_executor.py:279} 
INFO - [Try 2 of 3] Task Timeout Error for Task: 
(TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull', 
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2, 
map_index=-1)).
   [2024-10-28T20:04:01.715+0000] {timeout.py:68} ERROR - 
Process timed out, PID: 7
   [2024-10-28T20:04:01.720+0000] {celery_executor.py:279} 
INFO - [Try 3 of 3] Task Timeout Error for Task: 
(TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull', 
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2, 
map_index=-1)).
   [2024-10-28T20:04:03.529+0000] {timeout.py:68} ERROR - 
Process timed out, PID: 7
   [2024-10-28T20:04:03.533+0000] {celery_executor.py:290} 
ERROR - Error sending Celery task: Timeout, PID: 7
   Celery Task ID: TaskInstanceKey(dag_id='edgetag_currency', 
task_id='task_pull', run_id='manual__2024-10-28T19:57:46.193365+00:00', 
try_number=2, map_index=-1)
   Traceback (most recent call last):
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py", 
line 32, in __call__
   return self.__value__
   ^^^^^^^^^^^^^^
   AttributeError: 'ChannelPromise' object has no attribute '__value__'. Did 
you mean: '__call__'?
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
 line 951, in create_channel
   return self._avail_channels.pop()
   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   IndexError: pop from empty list
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 220, in send_task_to_executor
   result = task_to_run.apply_async(args=[command], queue=queue)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/task.py", 
line 594, in apply_async
   return app.send_task(
   ^^^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/base.py", 
line 801, in send_task
   amqp.send_task_message(P, name, message, **options)
   File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/amqp.py", 
line 518, in send_task_message
   ret = producer.publish(
   ^^^^^^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py", 
line 186, in publish
   return _publish(
   ^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 
556, in _ensured
   return fun(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py", 
line 195, in _publish
   channel = self.channel
   ^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py", 
line 218, in _get_channel
   channel = self._channel = channel()
   ^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py", 
line 34, in __call__
   value = self.__value__ = self.__contract__()
   ^^^^^^^^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py", 
line 234, in <lambda>
   channel = ChannelPromise(lambda: connection.default_channel)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 
953, in default_channel
   self._ensure_connection(**conn_opts)
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 
459, in _ensure_connection
   return retry_over_time(
   ^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py", 
line 318, in retry_over_time
   return fun(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 
934, in _connection_factory
   self._connection = self._establish_connection()
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 
860, in _establish_connection
   conn = self.transport.establish_connection()
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
 line 975, in establish_connection
   self._avail_channels.append(self.create_channel(self))
   ^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
 line 953, in create_channel
   channel = self.Channel(connection)
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/redis.py", 
line 744, in __init__
   self.client.ping()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/commands/core.py", 
line 1217, in ping
   return self.execute_command("PING", **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/home/airflow/.local/lib/python3.12/site-packages/redis/client.py", 
line 542, in execute_command
   conn = self.connection or pool.get_connection(command_name, **options)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 
1109, in get_connection
   connection.connect()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 
288, in connect
   self.on_connect()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 
354, in on_connect
   auth_response = self.read_response()
   ^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 
512, in read_response
   response = self._parser.read_response(disable_decoding=disable_decoding)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/resp2.py", 
line 15, in read_response
   result = self._read_response(disable_decoding=disable_decoding)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/resp2.py", 
line 25, in _read_response
   raw = self._buffer.readline()
   ^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/socket.py", 
line 115, in readline
   self._read_from_socket()
   File 
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/socket.py", 
line 65, in _read_from_socket
   data = self._sock.recv(socket_read_size)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/timeout.py", 
line 69, in handle_timeout
   raise AirflowTaskTimeout(self.error_message)
   airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 7
   
   [2024-10-28T20:04:03.534+0000] 
{scheduler_job_runner.py:689} INFO - Received executor event with 
state failed for task instance TaskInstanceKey(dag_id='edgetag_currency', 
task_id='task_pull', run_id='manual__2024-10-28T19:57:46.193365+00:00', 
try_number=2, map_index=-1)
   [2024-10-28T20:04:03.545+0000] 
{scheduler_job_runner.py:721} INFO - TaskInstance Finished: 
dag_id=edgetag_currency, task_id=task_pull, 
run_id=manual__2024-10-28T19:57:46.193365+00:00, map_index=-1, 
run_start_date=None, run_end_date=2024-10-28 19:57:57.429347+00:00, 
run_duration=None, state=queued, executor_state=failed, try_number=2, 
max_tries=1, job_id=None, pool=default_pool, queue=default, priority_weight=1, 
operator=_PythonDecoratedOperator, queued_dttm=2024-10-28 
20:03:57.470287+00:00, queued_by_job_id=1, pid=None
   [2024-10-28T20:04:03.545+0000] {task_context_logger.py:91} 
ERROR - The executor reported that the task instance <TaskInstance: 
edgetag_currency.task_pull manual__2024-10-28T19:57:46.193365+00:00 [queued]> 
finished with state failed, but the task instance's state attribute is queued. 
Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
   [2024-10-28T20:04:04.619+0000] {dagrun.py:819} ERROR - 
Marking run <DagRun edgetag_currency @ 2024-10-28 19:57:46.193365+00:00: 
manual__2024-10-28T19:57:46.193365+00:00, state:running, queued_at: 2024-10-28 
19:57:46.229834+00:00. externally triggered: True> failed
   [2024-10-28T20:04:04.620+0000] {dagrun.py:901} INFO - 
DagRun Finished: dag_id=edgetag_currency, execution_date=2024-10-28 
19:57:46.193365+00:00, run_id=manual__2024-10-28T19:57:46.193365+00:00, 
run_start_date=2024-10-28 19:57:47.166462+00:00, run_end_date=2024-10-28 
20:04:04.619902+00:00, run_duration=377.45344, state=failed, 
external_trigger=True, run_type=manual, data_interval_start=2024-10-28 
08:00:00+00:00, data_interval_end=2024-10-28 16:00:00+00:00, 
dag_hash=eff246fcd6ccb6a38bec38f44b82f466
   
   ```
   
   
   Current needed vars for timeout are been extended mostly by default values.
   This is  AWS ECS fargate with 3 services (webserver,scheduler and worker)
   
   
   Current redis is 7.0.7 , I did test also with version 6
   Postgres is pg14
   
   ### What you think should happen instead?
   
   Dag should run succesfully
   
   ### How to reproduce
   
   Deploy  Airflow 2.9.3 to AWS ECS fargate and try to run a DAG
   
   
   ### Operating System
   
   Windows/Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   AWS ECS Fargate
   
   ### Anything else?
   
   My current vars
   AIRFLOW__CELERY__BROKER_URL = **********
   AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=600
   AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=180
   AIRFLOW__CORE__EXECUTOR=CeleryExecutor
   AIRFLOW__CORE__SQL_ALCHEMY_CONN=**********
   AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
   AIRFLOW__WEBSERVER__LOG_FETCH_DELAY_SEC=30
   AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=60
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to